feat(kg): 补全知识图谱实体同步和关系构建

新增功能:
- 补全 4 类实体同步:Workflow、Job、LabelTask、KnowledgeSet
- 补全 7 类关系构建:USES_DATASET、PRODUCES、ASSIGNED_TO、TRIGGERS、DEPENDS_ON、IMPACTS、SOURCED_FROM
- 新增 39 个测试用例,总计 111 个测试

问题修复(三轮迭代):
第一轮(6 个问题):
- toStringList null/blank 过滤
- mergeUsesDatasetRelations 统一逻辑
- fetchAllPaged 去重抽取
- IMPACTS 占位标记
- 测试断言增强
- syncAll 固定索引改 Map

第二轮(2 个问题):
- 活跃 ID 空值/空白归一化(两层防御)
- 关系构建 N+1 查询消除(预加载 Map)

第三轮(1 个问题):
- 空元素 NPE 防御(GraphSyncService 12 处 + GraphSyncStepService 6 处)

代码变更:+1936 行,-101 行
测试结果:111 tests, 0 failures

已知 P3 问题(非阻塞):
- 安全注释与实现不一致(待权限过滤任务一起处理)
- 测试覆盖缺口(可后续补充)
This commit is contained in:
2026-02-18 11:30:38 +08:00
parent 37b478a052
commit ebb4548ca5
10 changed files with 1933 additions and 98 deletions

View File

@@ -4,8 +4,13 @@ import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.JobDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.LabelTaskDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.KnowledgeSetDTO;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
@@ -13,6 +18,9 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -68,12 +76,36 @@ class GraphSyncServiceTest {
.isInstanceOf(BusinessException.class);
}
@Test
void syncWorkflows_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncWorkflows(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void syncJobs_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncJobs(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void syncLabelTasks_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncLabelTasks(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void syncKnowledgeSets_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncKnowledgeSets(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
// -----------------------------------------------------------------------
// syncAll — 正常流程
// syncAll — 正常流程(8 实体 + 10 关系 = 18 结果)
// -----------------------------------------------------------------------
@Test
void syncAll_success_returnsResultList() {
void syncAll_success_returnsAllResults() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
@@ -81,40 +113,75 @@ class GraphSyncServiceTest {
dto.setName("Test");
dto.setCreatedBy("admin");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
SyncResult entityResult = SyncResult.builder().syncType("Dataset").build();
SyncResult fieldResult = SyncResult.builder().syncType("Field").build();
SyncResult userResult = SyncResult.builder().syncType("User").build();
SyncResult orgResult = SyncResult.builder().syncType("Org").build();
SyncResult hasFieldResult = SyncResult.builder().syncType("HAS_FIELD").build();
SyncResult derivedFromResult = SyncResult.builder().syncType("DERIVED_FROM").build();
SyncResult belongsToResult = SyncResult.builder().syncType("BELONGS_TO").build();
// 8 entity upsert results
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(entityResult);
.thenReturn(SyncResult.builder().syncType("Dataset").build());
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(fieldResult);
.thenReturn(SyncResult.builder().syncType("Field").build());
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
.thenReturn(userResult);
.thenReturn(SyncResult.builder().syncType("User").build());
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
.thenReturn(orgResult);
.thenReturn(SyncResult.builder().syncType("Org").build());
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
.thenReturn(0);
// 10 relation merge results
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
.thenReturn(hasFieldResult);
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(derivedFromResult);
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(belongsToResult);
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
List<SyncResult> results = syncService.syncAll(GRAPH_ID);
assertThat(results).hasSize(7);
assertThat(results.get(0).getSyncType()).isEqualTo("Dataset");
// 8 entities + 10 relations = 18
assertThat(results).hasSize(18);
// 按 syncType 建立索引,避免依赖固定下标
Map<String, SyncResult> byType = results.stream()
.collect(Collectors.toMap(SyncResult::getSyncType, Function.identity()));
// 验证所有 8 个实体类型都存在
assertThat(byType).containsKeys("Dataset", "Field", "User", "Org",
"Workflow", "Job", "LabelTask", "KnowledgeSet");
// 验证所有 10 个关系类型都存在
assertThat(byType).containsKeys("HAS_FIELD", "DERIVED_FROM", "BELONGS_TO",
"USES_DATASET", "PRODUCES", "ASSIGNED_TO", "TRIGGERS",
"DEPENDS_ON", "IMPACTS", "SOURCED_FROM");
}
// -----------------------------------------------------------------------
// syncAll — 正常流程
// 重试耗尽
// -----------------------------------------------------------------------
@Test
@@ -124,6 +191,148 @@ class GraphSyncServiceTest {
assertThatThrownBy(() -> syncService.syncDatasets(GRAPH_ID))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("拉取数据集失败");
.hasMessageContaining("datasets");
}
// -----------------------------------------------------------------------
// 新增实体单步同步
// -----------------------------------------------------------------------
@Nested
class NewEntitySyncTest {
@Test
void syncWorkflows_success() {
when(properties.getSync()).thenReturn(syncConfig);
WorkflowDTO wf = new WorkflowDTO();
wf.setId("wf-001");
wf.setName("Clean Pipeline");
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of(wf));
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Workflow"), anySet(), anyString()))
.thenReturn(0);
SyncResult result = syncService.syncWorkflows(GRAPH_ID);
assertThat(result.getSyncType()).isEqualTo("Workflow");
verify(stepService).upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString());
}
@Test
void syncJobs_success() {
when(properties.getSync()).thenReturn(syncConfig);
JobDTO job = new JobDTO();
job.setId("job-001");
job.setName("Clean Job");
when(dataManagementClient.listAllJobs()).thenReturn(List.of(job));
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Job"), anySet(), anyString()))
.thenReturn(0);
SyncResult result = syncService.syncJobs(GRAPH_ID);
assertThat(result.getSyncType()).isEqualTo("Job");
verify(stepService).upsertJobEntities(eq(GRAPH_ID), anyList(), anyString());
}
@Test
void syncLabelTasks_success() {
when(properties.getSync()).thenReturn(syncConfig);
LabelTaskDTO task = new LabelTaskDTO();
task.setId("lt-001");
task.setName("Label Task");
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of(task));
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("LabelTask"), anySet(), anyString()))
.thenReturn(0);
SyncResult result = syncService.syncLabelTasks(GRAPH_ID);
assertThat(result.getSyncType()).isEqualTo("LabelTask");
}
@Test
void syncKnowledgeSets_success() {
when(properties.getSync()).thenReturn(syncConfig);
KnowledgeSetDTO ks = new KnowledgeSetDTO();
ks.setId("ks-001");
ks.setName("Knowledge Set");
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of(ks));
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("KnowledgeSet"), anySet(), anyString()))
.thenReturn(0);
SyncResult result = syncService.syncKnowledgeSets(GRAPH_ID);
assertThat(result.getSyncType()).isEqualTo("KnowledgeSet");
}
@Test
void syncWorkflows_fetchFailed_throwsBusinessException() {
when(properties.getSync()).thenReturn(syncConfig);
when(dataManagementClient.listAllWorkflows()).thenThrow(new RuntimeException("timeout"));
assertThatThrownBy(() -> syncService.syncWorkflows(GRAPH_ID))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("workflows");
}
}
// -----------------------------------------------------------------------
// 新增关系构建
// -----------------------------------------------------------------------
@Nested
class NewRelationBuildTest {
@Test
void buildUsesDatasetRelations_invalidGraphId_throws() {
assertThatThrownBy(() -> syncService.buildUsesDatasetRelations(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void buildProducesRelations_invalidGraphId_throws() {
assertThatThrownBy(() -> syncService.buildProducesRelations(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void buildAssignedToRelations_invalidGraphId_throws() {
assertThatThrownBy(() -> syncService.buildAssignedToRelations(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void buildTriggersRelations_invalidGraphId_throws() {
assertThatThrownBy(() -> syncService.buildTriggersRelations(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void buildDependsOnRelations_invalidGraphId_throws() {
assertThatThrownBy(() -> syncService.buildDependsOnRelations(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void buildImpactsRelations_invalidGraphId_throws() {
assertThatThrownBy(() -> syncService.buildImpactsRelations(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
@Test
void buildSourcedFromRelations_invalidGraphId_throws() {
assertThatThrownBy(() -> syncService.buildSourcedFromRelations(INVALID_GRAPH_ID))
.isInstanceOf(BusinessException.class);
}
}
}

View File

@@ -5,6 +5,10 @@ import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.domain.repository.GraphEntityRepository;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.TagDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.JobDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.LabelTaskDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.KnowledgeSetDTO;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
@@ -22,6 +26,8 @@ import org.springframework.data.neo4j.core.Neo4jClient.RecordFetchSpec;
import org.springframework.data.neo4j.core.Neo4jClient.MappingSpec;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
@@ -56,9 +62,6 @@ class GraphSyncStepServiceTest {
// Neo4jClient mock chain helper
// -----------------------------------------------------------------------
/**
* Build a full mock chain for neo4jClient.query(...).bindAll(...).fetchAs(Long).mappedBy(...).one()
*/
@SuppressWarnings("unchecked")
private void setupNeo4jQueryChain(Class<?> fetchType, Object returnValue) {
UnboundRunnableSpec unboundSpec = mock(UnboundRunnableSpec.class);
@@ -90,7 +93,6 @@ class GraphSyncStepServiceTest {
GRAPH_ID, "Dataset", Collections.emptySet(), SYNC_ID);
assertThat(deleted).isEqualTo(0);
// Should NOT execute any Cypher query
verifyNoInteractions(neo4jClient);
}
@@ -133,7 +135,7 @@ class GraphSyncStepServiceTest {
}
// -----------------------------------------------------------------------
// upsertDatasetEntities — P2-5 单条 Cypher 优化
// upsertDatasetEntities
// -----------------------------------------------------------------------
@Nested
@@ -200,12 +202,10 @@ class GraphSyncStepServiceTest {
stepService.upsertDatasetEntities(GRAPH_ID, List.of(dto), SYNC_ID);
// Verify the MERGE Cypher includes property SET clauses (N+1 fix)
verify(neo4jClient).query(cypherCaptor.capture());
String cypher = cypherCaptor.getValue();
assertThat(cypher).contains("MERGE");
assertThat(cypher).contains("properties.");
// Verify NO separate find+save (N+1 eliminated)
verifyNoInteractions(entityRepository);
}
@@ -270,7 +270,205 @@ class GraphSyncStepServiceTest {
}
// -----------------------------------------------------------------------
// 关系构建
// upsertWorkflowEntities
// -----------------------------------------------------------------------
@Nested
class UpsertWorkflowEntitiesTest {
@Test
void upsert_newWorkflow_incrementsCreated() {
when(properties.getImportBatchSize()).thenReturn(100);
setupNeo4jQueryChain(Boolean.class, true);
WorkflowDTO dto = new WorkflowDTO();
dto.setId("wf-001");
dto.setName("Clean Pipeline");
dto.setWorkflowType("CLEANING");
dto.setStatus("ACTIVE");
dto.setVersion("2.1");
dto.setOperatorCount(3);
SyncResult result = stepService.upsertWorkflowEntities(
GRAPH_ID, List.of(dto), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(1);
assertThat(result.getSyncType()).isEqualTo("Workflow");
}
@Test
void upsert_emptyList_returnsZero() {
when(properties.getImportBatchSize()).thenReturn(100);
SyncResult result = stepService.upsertWorkflowEntities(
GRAPH_ID, List.of(), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(0);
verifyNoInteractions(neo4jClient);
}
@Test
void upsert_withInputDatasetIds_storesProperty() {
when(properties.getImportBatchSize()).thenReturn(100);
setupNeo4jQueryChain(Boolean.class, true);
WorkflowDTO dto = new WorkflowDTO();
dto.setId("wf-001");
dto.setName("Pipeline");
dto.setWorkflowType("CLEANING");
dto.setInputDatasetIds(List.of("ds-001", "ds-002"));
stepService.upsertWorkflowEntities(GRAPH_ID, List.of(dto), SYNC_ID);
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("properties.input_dataset_ids");
}
}
// -----------------------------------------------------------------------
// upsertJobEntities
// -----------------------------------------------------------------------
@Nested
class UpsertJobEntitiesTest {
@Test
void upsert_newJob_incrementsCreated() {
when(properties.getImportBatchSize()).thenReturn(100);
setupNeo4jQueryChain(Boolean.class, true);
JobDTO dto = new JobDTO();
dto.setId("job-001");
dto.setName("Clean Job");
dto.setJobType("CLEANING");
dto.setStatus("COMPLETED");
dto.setDurationSeconds(2100L);
dto.setInputDatasetId("ds-001");
dto.setOutputDatasetId("ds-002");
dto.setWorkflowId("wf-001");
dto.setCreatedBy("admin");
SyncResult result = stepService.upsertJobEntities(
GRAPH_ID, List.of(dto), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(1);
assertThat(result.getSyncType()).isEqualTo("Job");
}
@Test
void upsert_jobWithDependency_storesProperty() {
when(properties.getImportBatchSize()).thenReturn(100);
setupNeo4jQueryChain(Boolean.class, true);
JobDTO dto = new JobDTO();
dto.setId("job-002");
dto.setName("Downstream Job");
dto.setJobType("SYNTHESIS");
dto.setStatus("PENDING");
dto.setDependsOnJobId("job-001");
stepService.upsertJobEntities(GRAPH_ID, List.of(dto), SYNC_ID);
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("properties.depends_on_job_id");
}
@Test
void upsert_emptyList_returnsZero() {
when(properties.getImportBatchSize()).thenReturn(100);
SyncResult result = stepService.upsertJobEntities(
GRAPH_ID, List.of(), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(0);
verifyNoInteractions(neo4jClient);
}
}
// -----------------------------------------------------------------------
// upsertLabelTaskEntities
// -----------------------------------------------------------------------
@Nested
class UpsertLabelTaskEntitiesTest {
@Test
void upsert_newLabelTask_incrementsCreated() {
when(properties.getImportBatchSize()).thenReturn(100);
setupNeo4jQueryChain(Boolean.class, true);
LabelTaskDTO dto = new LabelTaskDTO();
dto.setId("lt-001");
dto.setName("Label Task");
dto.setTaskMode("MANUAL");
dto.setStatus("IN_PROGRESS");
dto.setDataType("image");
dto.setLabelingType("object_detection");
dto.setProgress(45.5);
dto.setDatasetId("ds-001");
dto.setCreatedBy("admin");
SyncResult result = stepService.upsertLabelTaskEntities(
GRAPH_ID, List.of(dto), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(1);
assertThat(result.getSyncType()).isEqualTo("LabelTask");
}
@Test
void upsert_emptyList_returnsZero() {
when(properties.getImportBatchSize()).thenReturn(100);
SyncResult result = stepService.upsertLabelTaskEntities(
GRAPH_ID, List.of(), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(0);
verifyNoInteractions(neo4jClient);
}
}
// -----------------------------------------------------------------------
// upsertKnowledgeSetEntities
// -----------------------------------------------------------------------
@Nested
class UpsertKnowledgeSetEntitiesTest {
@Test
void upsert_newKnowledgeSet_incrementsCreated() {
when(properties.getImportBatchSize()).thenReturn(100);
setupNeo4jQueryChain(Boolean.class, true);
KnowledgeSetDTO dto = new KnowledgeSetDTO();
dto.setId("ks-001");
dto.setName("Medical Knowledge");
dto.setStatus("PUBLISHED");
dto.setDomain("medical");
dto.setSensitivity("INTERNAL");
dto.setItemCount(320);
dto.setSourceDatasetIds(List.of("ds-001", "ds-002"));
SyncResult result = stepService.upsertKnowledgeSetEntities(
GRAPH_ID, List.of(dto), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(1);
assertThat(result.getSyncType()).isEqualTo("KnowledgeSet");
}
@Test
void upsert_emptyList_returnsZero() {
when(properties.getImportBatchSize()).thenReturn(100);
SyncResult result = stepService.upsertKnowledgeSetEntities(
GRAPH_ID, List.of(), SYNC_ID);
assertThat(result.getCreated()).isEqualTo(0);
verifyNoInteractions(neo4jClient);
}
}
// -----------------------------------------------------------------------
// 关系构建 - 已有
// -----------------------------------------------------------------------
@Nested
@@ -280,6 +478,8 @@ class GraphSyncStepServiceTest {
void mergeHasField_noFields_returnsEmptyResult() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
.thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
.thenReturn(List.of());
SyncResult result = stepService.mergeHasFieldRelations(GRAPH_ID, SYNC_ID);
@@ -293,7 +493,7 @@ class GraphSyncStepServiceTest {
.id("entity-1")
.type("Dataset")
.graphId(GRAPH_ID)
.properties(new HashMap<>()) // no parent_dataset_id
.properties(new HashMap<>())
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
@@ -315,4 +515,307 @@ class GraphSyncStepServiceTest {
assertThat(result.getErrors()).contains("belongs_to:org_missing");
}
}
// -----------------------------------------------------------------------
// 新增关系构建
// -----------------------------------------------------------------------
@Nested
class NewMergeRelationsTest {
@Test
void mergeUsesDataset_noJobs_noTasks_noWorkflows_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of());
SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("USES_DATASET");
assertThat(result.getCreated()).isEqualTo(0);
}
@Test
void mergeUsesDataset_jobWithInputDataset_createsRelation() {
setupNeo4jQueryChain(String.class, "new-rel-id");
GraphEntity job = GraphEntity.builder()
.id("job-entity-1").type("Job").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("input_dataset_id", "ds-001")))
.build();
GraphEntity dataset = GraphEntity.builder()
.id("ds-entity-1").sourceId("ds-001").type("Dataset").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset));
SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("USES_DATASET");
verify(neo4jClient).query(cypherCaptor.capture());
String cypher = cypherCaptor.getValue();
// 验证关系类型和方向:source → target
assertThat(cypher).contains("RELATED_TO");
assertThat(cypher).contains("relation_type: $relationType");
}
@Test
void mergeUsesDataset_workflowWithSingleStringInput_handledCorrectly() {
setupNeo4jQueryChain(String.class, "new-rel-id");
// 单值 String 而非 List,验证 toStringList 统一处理
GraphEntity workflow = GraphEntity.builder()
.id("wf-entity-1").type("Workflow").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("input_dataset_ids", "ds-single")))
.build();
GraphEntity dataset = GraphEntity.builder()
.id("ds-entity-s").sourceId("ds-single").type("Dataset").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of(workflow));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset));
SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID);
verify(neo4jClient, atLeastOnce()).query(anyString());
}
@Test
void mergeUsesDataset_listWithNullElements_filtersNulls() {
setupNeo4jQueryChain(String.class, "new-rel-id");
List<Object> listWithNulls = new ArrayList<>();
listWithNulls.add("ds-good");
listWithNulls.add(null);
listWithNulls.add("");
listWithNulls.add(" ");
GraphEntity workflow = GraphEntity.builder()
.id("wf-entity-2").type("Workflow").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("input_dataset_ids", listWithNulls)))
.build();
GraphEntity dataset = GraphEntity.builder()
.id("ds-entity-g").sourceId("ds-good").type("Dataset").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of(workflow));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset));
SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID);
// 只有 "ds-good" 应被处理(null、空、空白已过滤),验证只发起一次 mergeRelation
verify(neo4jClient, times(1)).query(anyString());
}
@Test
void mergeProduces_noJobs_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of());
SyncResult result = stepService.mergeProducesRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("PRODUCES");
assertThat(result.getCreated()).isEqualTo(0);
}
@Test
void mergeProduces_jobWithoutOutput_skips() {
GraphEntity job = GraphEntity.builder()
.id("job-entity-1").type("Job").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of());
SyncResult result = stepService.mergeProducesRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getCreated()).isEqualTo(0);
}
@Test
void mergeAssignedTo_noTasksOrJobs_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User")).thenReturn(List.of());
SyncResult result = stepService.mergeAssignedToRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("ASSIGNED_TO");
assertThat(result.getCreated()).isEqualTo(0);
}
@Test
void mergeAssignedTo_taskWithCreatedBy_verifiesUserLookup() {
setupNeo4jQueryChain(String.class, "new-rel-id");
GraphEntity task = GraphEntity.builder()
.id("lt-entity-1").type("LabelTask").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("created_by", "admin")))
.build();
GraphEntity user = GraphEntity.builder()
.id("user-entity-1").sourceId("user:admin").type("User").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of(task));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User")).thenReturn(List.of(user));
SyncResult result = stepService.mergeAssignedToRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("ASSIGNED_TO");
// 验证通过预加载的 userMap 查找 User 实体(不再有 N+1 查询)
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
}
@Test
void mergeTriggers_noJobs_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of());
SyncResult result = stepService.mergeTriggersRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("TRIGGERS");
assertThat(result.getCreated()).isEqualTo(0);
}
@Test
void mergeTriggers_jobWithWorkflowId_createsRelationWithCorrectDirection() {
setupNeo4jQueryChain(String.class, "new-rel-id");
GraphEntity job = GraphEntity.builder()
.id("job-entity-1").type("Job").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("workflow_id", "wf-001")))
.build();
GraphEntity workflow = GraphEntity.builder()
.id("wf-entity-1").sourceId("wf-001").type("Workflow").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of(workflow));
SyncResult result = stepService.mergeTriggersRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("TRIGGERS");
verify(neo4jClient).query(cypherCaptor.capture());
String cypher = cypherCaptor.getValue();
// 验证 MERGE 使用 RELATED_TO 关系类型
assertThat(cypher).contains("RELATED_TO");
// 验证 Cypher 参数绑定:source 应为 Workflow,target 应为 Job
assertThat(cypher).contains("$sourceEntityId");
assertThat(cypher).contains("$targetEntityId");
}
@Test
void mergeDependsOn_noJobs_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of());
SyncResult result = stepService.mergeDependsOnRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("DEPENDS_ON");
assertThat(result.getCreated()).isEqualTo(0);
}
@Test
void mergeDependsOn_jobWithDependency_verifiesCypherParams() {
setupNeo4jQueryChain(String.class, "new-rel-id");
GraphEntity job = GraphEntity.builder()
.id("job-entity-2").sourceId("job-002").type("Job").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("depends_on_job_id", "job-001")))
.build();
GraphEntity depJob = GraphEntity.builder()
.id("job-entity-1").sourceId("job-001").type("Job").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job, depJob));
SyncResult result = stepService.mergeDependsOnRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("DEPENDS_ON");
verify(neo4jClient).query(cypherCaptor.capture());
String cypher = cypherCaptor.getValue();
assertThat(cypher).contains("RELATED_TO");
assertThat(cypher).contains("$relationType");
}
@Test
void mergeImpacts_returnsPlaceholderResult() {
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
assertThat(result.getCreated()).isEqualTo(0);
assertThat(result.isPlaceholder()).isTrue();
verifyNoInteractions(neo4jClient);
verifyNoInteractions(entityRepository);
}
@Test
void mergeSourcedFrom_noKnowledgeSets_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "KnowledgeSet")).thenReturn(List.of());
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of());
SyncResult result = stepService.mergeSourcedFromRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("SOURCED_FROM");
assertThat(result.getCreated()).isEqualTo(0);
}
@Test
void mergeSourcedFrom_ksWithSources_verifiesCypherAndLookup() {
setupNeo4jQueryChain(String.class, "new-rel-id");
GraphEntity ks = GraphEntity.builder()
.id("ks-entity-1").type("KnowledgeSet").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("source_dataset_ids", List.of("ds-001"))))
.build();
GraphEntity dataset = GraphEntity.builder()
.id("ds-entity-1").sourceId("ds-001").type("Dataset").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "KnowledgeSet")).thenReturn(List.of(ks));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset));
SyncResult result = stepService.mergeSourcedFromRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("SOURCED_FROM");
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
}
@Test
void mergeSourcedFrom_listWithNullElements_filtersNulls() {
setupNeo4jQueryChain(String.class, "new-rel-id");
List<Object> listWithNulls = new ArrayList<>();
listWithNulls.add("ds-valid");
listWithNulls.add(null);
listWithNulls.add("");
GraphEntity ks = GraphEntity.builder()
.id("ks-entity-2").type("KnowledgeSet").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("source_dataset_ids", listWithNulls)))
.build();
GraphEntity dataset = GraphEntity.builder()
.id("ds-entity-v").sourceId("ds-valid").type("Dataset").graphId(GRAPH_ID)
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "KnowledgeSet")).thenReturn(List.of(ks));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset));
SyncResult result = stepService.mergeSourcedFromRelations(GRAPH_ID, SYNC_ID);
// 只有 "ds-valid" 应被处理,null 和空字符串已过滤;验证只发起一次 mergeRelation
verify(neo4jClient, times(1)).query(anyString());
}
}
}