diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java index ef7693b..db6a33e 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java @@ -200,6 +200,108 @@ public class GraphSyncService { } } + // ----------------------------------------------------------------------- + // 增量同步 + // ----------------------------------------------------------------------- + + /** + * 增量同步:仅拉取指定时间窗口内变更的数据并同步到 Neo4j。 + *

+ * 与全量同步的区别: + *

+ */ + public SyncMetadata syncIncremental(String graphId, LocalDateTime updatedFrom, LocalDateTime updatedTo) { + validateGraphId(graphId); + if (updatedFrom == null || updatedTo == null) { + throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "增量同步必须指定 updatedFrom 和 updatedTo"); + } + if (updatedFrom.isAfter(updatedTo)) { + throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "updatedFrom 不能晚于 updatedTo"); + } + + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); + + ReentrantLock lock = acquireLock(graphId, syncId); + try { + log.info("[{}] Starting incremental sync for graphId={}, window=[{}, {}]", + syncId, graphId, updatedFrom, updatedTo); + + // 拉取时间窗口内变更的数据 + List datasets = fetchWithRetry(syncId, "datasets", + () -> dataManagementClient.listAllDatasets(updatedFrom, updatedTo)); + List workflows = fetchWithRetry(syncId, "workflows", + () -> dataManagementClient.listAllWorkflows(updatedFrom, updatedTo)); + List jobs = fetchWithRetry(syncId, "jobs", + () -> dataManagementClient.listAllJobs(updatedFrom, updatedTo)); + List labelTasks = fetchWithRetry(syncId, "label-tasks", + () -> dataManagementClient.listAllLabelTasks(updatedFrom, updatedTo)); + List knowledgeSets = fetchWithRetry(syncId, "knowledge-sets", + () -> dataManagementClient.listAllKnowledgeSets(updatedFrom, updatedTo)); + + Map resultMap = new LinkedHashMap<>(); + + // 实体同步(仅 upsert,不 purge) + resultMap.put("Dataset", stepService.upsertDatasetEntities(graphId, datasets, syncId)); + resultMap.put("Field", stepService.upsertFieldEntities(graphId, datasets, syncId)); + + Set usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets); + resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId)); + resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId)); + resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId)); + resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId)); + resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId)); + resultMap.put("KnowledgeSet", stepService.upsertKnowledgeSetEntities(graphId, knowledgeSets, syncId)); + + // 收集所有变更(创建或更新)的实体ID,用于增量关系构建 + Set changedEntityIds = collectChangedEntityIds(datasets, workflows, jobs, labelTasks, knowledgeSets, graphId); + + // 关系构建(MERGE 幂等)- 增量同步时只处理变更实体相关的关系 + resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId, changedEntityIds)); + resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId, changedEntityIds)); + resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId, changedEntityIds)); + resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId, changedEntityIds)); + resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId, changedEntityIds)); + resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId, changedEntityIds)); + resultMap.put("TRIGGERS", stepService.mergeTriggersRelations(graphId, syncId, changedEntityIds)); + resultMap.put("DEPENDS_ON", stepService.mergeDependsOnRelations(graphId, syncId, changedEntityIds)); + resultMap.put("IMPACTS", stepService.mergeImpactsRelations(graphId, syncId, changedEntityIds)); + resultMap.put("SOURCED_FROM", stepService.mergeSourcedFromRelations(graphId, syncId, changedEntityIds)); + + List results = new ArrayList<>(resultMap.values()); + log.info("[{}] Incremental sync completed for graphId={}. Summary: {}", syncId, graphId, + results.stream() + .map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")") + .collect(Collectors.joining(", "))); + + SyncMetadata metadata = SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, results); + metadata.setUpdatedFrom(updatedFrom); + metadata.setUpdatedTo(updatedTo); + saveSyncHistory(metadata); + return metadata; + } catch (BusinessException e) { + SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage()); + failed.setUpdatedFrom(updatedFrom); + failed.setUpdatedTo(updatedTo); + saveSyncHistory(failed); + throw e; + } catch (Exception e) { + SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage()); + failed.setUpdatedFrom(updatedFrom); + failed.setUpdatedTo(updatedTo); + saveSyncHistory(failed); + log.error("[{}] Incremental sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "增量同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + // ----------------------------------------------------------------------- // 单步同步(各自获取锁和数据) // ----------------------------------------------------------------------- @@ -771,6 +873,74 @@ public class GraphSyncService { } } + /** + * 收集增量同步中变更(创建或更新)的实体ID。 + * 通过查询数据库获取这些sourceId对应的entityId。 + */ + private Set collectChangedEntityIds(List datasets, + List workflows, + List jobs, + List labelTasks, + List knowledgeSets, + String graphId) { + Set entityIds = new HashSet<>(); + + // 通过数据管理客户端获取到的sourceId,需要转换为对应的entityId + // 这里使用简化的方法:查询所有相关类型的实体并根据sourceId匹配 + try { + // 收集所有变更的sourceId + Set changedSourceIds = new HashSet<>(); + + datasets.stream().filter(Objects::nonNull).map(DatasetDTO::getId).filter(Objects::nonNull) + .forEach(changedSourceIds::add); + + workflows.stream().filter(Objects::nonNull).map(WorkflowDTO::getId).filter(Objects::nonNull) + .forEach(changedSourceIds::add); + + jobs.stream().filter(Objects::nonNull).map(JobDTO::getId).filter(Objects::nonNull) + .forEach(changedSourceIds::add); + + labelTasks.stream().filter(Objects::nonNull).map(LabelTaskDTO::getId).filter(Objects::nonNull) + .forEach(changedSourceIds::add); + + knowledgeSets.stream().filter(Objects::nonNull).map(KnowledgeSetDTO::getId).filter(Objects::nonNull) + .forEach(changedSourceIds::add); + + // 添加字段的sourceId + for (DatasetDTO dataset : datasets) { + if (dataset != null && dataset.getTags() != null) { + for (DataManagementClient.TagDTO tag : dataset.getTags()) { + if (tag != null && tag.getName() != null) { + changedSourceIds.add(dataset.getId() + ":tag:" + tag.getName()); + } + } + } + } + + // 查询这些sourceId对应的entityId + if (!changedSourceIds.isEmpty()) { + for (String sourceId : changedSourceIds) { + // 简化处理:这里可以优化为批量查询 + String cypher = "MATCH (e:Entity {graph_id: $graphId, source_id: $sourceId}) RETURN e.id AS entityId"; + List foundEntityIds = stepService.neo4jClient.query(cypher) + .bindAll(Map.of("graphId", graphId, "sourceId", sourceId)) + .fetchAs(String.class) + .mappedBy((ts, record) -> record.get("entityId").asString()) + .all() + .stream().toList(); + entityIds.addAll(foundEntityIds); + } + } + } catch (Exception e) { + log.warn("Failed to collect changed entity IDs, falling back to full relation rebuild: {}", e.getMessage()); + // 如果收集失败,返回null表示进行全量关系构建 + return null; + } + + log.debug("Collected {} changed entity IDs for incremental relation building", entityIds.size()); + return entityIds; + } + private void validateGraphId(String graphId) { if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) { throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效"); diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java index 2f62f3b..3b84075 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java @@ -39,7 +39,7 @@ public class GraphSyncStepService { private static final String REL_TYPE = "RELATED_TO"; private final GraphEntityRepository entityRepository; - private final Neo4jClient neo4jClient; + final Neo4jClient neo4jClient; // 改为包级别访问,供GraphSyncService使用 private final KnowledgeGraphProperties properties; // ----------------------------------------------------------------------- @@ -441,11 +441,35 @@ public class GraphSyncStepService { @Transactional public SyncResult mergeHasFieldRelations(String graphId, String syncId) { + return mergeHasFieldRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeHasFieldRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("HAS_FIELD", syncId); Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); List fields = entityRepository.findByGraphIdAndType(graphId, "Field"); + // 增量同步时只处理变更相关的字段 + if (changedEntityIds != null) { + fields = fields.stream() + .filter(field -> { + // 包含自身变更的字段 + if (changedEntityIds.contains(field.getId())) { + return true; + } + // 包含关联数据集发生变更的字段 + Object datasetSourceId = field.getProperties().get("dataset_source_id"); + if (datasetSourceId != null) { + String datasetEntityId = datasetMap.get(datasetSourceId.toString()); + return datasetEntityId != null && changedEntityIds.contains(datasetEntityId); + } + return false; + }) + .toList(); + } + for (GraphEntity field : fields) { try { Object datasetSourceId = field.getProperties().get("dataset_source_id"); @@ -477,11 +501,23 @@ public class GraphSyncStepService { @Transactional public SyncResult mergeDerivedFromRelations(String graphId, String syncId) { + return mergeDerivedFromRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeDerivedFromRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("DERIVED_FROM", syncId); Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); List datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); + // 增量同步时只处理变更的数据集 + if (changedEntityIds != null) { + datasets = datasets.stream() + .filter(dataset -> changedEntityIds.contains(dataset.getId())) + .toList(); + } + for (GraphEntity dataset : datasets) { try { Object parentId = dataset.getProperties().get("parent_dataset_id"); @@ -512,6 +548,11 @@ public class GraphSyncStepService { @Transactional public SyncResult mergeBelongsToRelations(String graphId, String syncId) { + return mergeBelongsToRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeBelongsToRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("BELONGS_TO", syncId); Optional defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType( @@ -524,7 +565,13 @@ public class GraphSyncStepService { String orgId = defaultOrgOpt.get().getId(); // User → Org - for (GraphEntity user : entityRepository.findByGraphIdAndType(graphId, "User")) { + List users = entityRepository.findByGraphIdAndType(graphId, "User"); + if (changedEntityIds != null) { + users = users.stream() + .filter(user -> changedEntityIds.contains(user.getId())) + .toList(); + } + for (GraphEntity user : users) { try { boolean created = mergeRelation(graphId, user.getId(), orgId, "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); @@ -536,7 +583,13 @@ public class GraphSyncStepService { } // Dataset → Org - for (GraphEntity dataset : entityRepository.findByGraphIdAndType(graphId, "Dataset")) { + List datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); + if (changedEntityIds != null) { + datasets = datasets.stream() + .filter(dataset -> changedEntityIds.contains(dataset.getId())) + .toList(); + } + for (GraphEntity dataset : datasets) { try { boolean created = mergeRelation(graphId, dataset.getId(), orgId, "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); @@ -559,22 +612,45 @@ public class GraphSyncStepService { */ @Transactional public SyncResult mergeUsesDatasetRelations(String graphId, String syncId) { + return mergeUsesDatasetRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeUsesDatasetRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("USES_DATASET", syncId); Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); // Job → Dataset (via input_dataset_id) - for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + List jobs = entityRepository.findByGraphIdAndType(graphId, "Job"); + if (changedEntityIds != null) { + jobs = jobs.stream() + .filter(job -> changedEntityIds.contains(job.getId())) + .toList(); + } + for (GraphEntity job : jobs) { mergeEntityToDatasets(graphId, job, "input_dataset_id", datasetMap, result, syncId); } // LabelTask → Dataset (via dataset_id) - for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { + List tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask"); + if (changedEntityIds != null) { + tasks = tasks.stream() + .filter(task -> changedEntityIds.contains(task.getId())) + .toList(); + } + for (GraphEntity task : tasks) { mergeEntityToDatasets(graphId, task, "dataset_id", datasetMap, result, syncId); } // Workflow → Dataset (via input_dataset_ids, multi-value) - for (GraphEntity workflow : entityRepository.findByGraphIdAndType(graphId, "Workflow")) { + List workflows = entityRepository.findByGraphIdAndType(graphId, "Workflow"); + if (changedEntityIds != null) { + workflows = workflows.stream() + .filter(workflow -> changedEntityIds.contains(workflow.getId())) + .toList(); + } + for (GraphEntity workflow : workflows) { mergeEntityToDatasets(graphId, workflow, "input_dataset_ids", datasetMap, result, syncId); } @@ -616,11 +692,23 @@ public class GraphSyncStepService { */ @Transactional public SyncResult mergeProducesRelations(String graphId, String syncId) { + return mergeProducesRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeProducesRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("PRODUCES", syncId); Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); - for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + List jobs = entityRepository.findByGraphIdAndType(graphId, "Job"); + if (changedEntityIds != null) { + jobs = jobs.stream() + .filter(job -> changedEntityIds.contains(job.getId())) + .toList(); + } + + for (GraphEntity job : jobs) { try { Object outputDatasetId = job.getProperties().get("output_dataset_id"); if (outputDatasetId == null || outputDatasetId.toString().isBlank()) { @@ -647,17 +735,34 @@ public class GraphSyncStepService { */ @Transactional public SyncResult mergeAssignedToRelations(String graphId, String syncId) { + return mergeAssignedToRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeAssignedToRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("ASSIGNED_TO", syncId); Map userMap = buildSourceIdToEntityIdMap(graphId, "User"); // LabelTask → User - for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { + List tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask"); + if (changedEntityIds != null) { + tasks = tasks.stream() + .filter(task -> changedEntityIds.contains(task.getId())) + .toList(); + } + for (GraphEntity task : tasks) { mergeCreatorAssignment(graphId, task, "label_task", userMap, result, syncId); } // Job → User - for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + List jobs = entityRepository.findByGraphIdAndType(graphId, "Job"); + if (changedEntityIds != null) { + jobs = jobs.stream() + .filter(job -> changedEntityIds.contains(job.getId())) + .toList(); + } + for (GraphEntity job : jobs) { mergeCreatorAssignment(graphId, job, "job", userMap, result, syncId); } @@ -692,11 +797,23 @@ public class GraphSyncStepService { */ @Transactional public SyncResult mergeTriggersRelations(String graphId, String syncId) { + return mergeTriggersRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeTriggersRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("TRIGGERS", syncId); Map workflowMap = buildSourceIdToEntityIdMap(graphId, "Workflow"); - for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + List jobs = entityRepository.findByGraphIdAndType(graphId, "Job"); + if (changedEntityIds != null) { + jobs = jobs.stream() + .filter(job -> changedEntityIds.contains(job.getId())) + .toList(); + } + + for (GraphEntity job : jobs) { try { Object workflowId = job.getProperties().get("workflow_id"); if (workflowId == null || workflowId.toString().isBlank()) { @@ -724,11 +841,23 @@ public class GraphSyncStepService { */ @Transactional public SyncResult mergeDependsOnRelations(String graphId, String syncId) { + return mergeDependsOnRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeDependsOnRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("DEPENDS_ON", syncId); Map jobMap = buildSourceIdToEntityIdMap(graphId, "Job"); - for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + List jobs = entityRepository.findByGraphIdAndType(graphId, "Job"); + if (changedEntityIds != null) { + jobs = jobs.stream() + .filter(job -> changedEntityIds.contains(job.getId())) + .toList(); + } + + for (GraphEntity job : jobs) { try { Object depJobId = job.getProperties().get("depends_on_job_id"); if (depJobId == null || depJobId.toString().isBlank()) { @@ -751,29 +880,159 @@ public class GraphSyncStepService { } /** - * 构建 IMPACTS 关系:Field → Field。 + * 构建 IMPACTS 关系:Field → Field(字段级血缘)。 *

- * TODO: 字段影响关系来源于 LLM 抽取或规则引擎,而非简单外键关联。 - * 当前 MVP 阶段为占位实现,后续由抽取模块填充。 + * 通过两种途径推导字段间的影响关系: + *

    + *
  1. DERIVED_FROM:若 Dataset B 派生自 Dataset A(parent_dataset_id), + * 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT)。
  2. + *
  3. Job 输入/输出:若 Job 使用 Dataset A 并产出 Dataset B, + * 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT, job_id=源 ID)。
  4. + *
*/ @Transactional public SyncResult mergeImpactsRelations(String graphId, String syncId) { + return mergeImpactsRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeImpactsRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("IMPACTS", syncId); - result.setPlaceholder(true); - log.debug("[{}] IMPACTS relations require extraction data, skipping in sync phase", syncId); + + // 1. 加载所有 Field,按 dataset_source_id 分组 + List allFields = entityRepository.findByGraphIdAndType(graphId, "Field"); + Map> fieldsByDataset = allFields.stream() + .filter(f -> f.getProperties().get("dataset_source_id") != null) + .collect(Collectors.groupingBy( + f -> f.getProperties().get("dataset_source_id").toString())); + + if (fieldsByDataset.isEmpty()) { + log.debug("[{}] No fields with dataset_source_id found, skipping IMPACTS", syncId); + return endResult(result); + } + + // 记录已处理的 (sourceDatasetId, targetDatasetId) 对,避免重复 + Set processedPairs = new HashSet<>(); + + // 2. DERIVED_FROM 推导:parent dataset fields → child dataset fields + List allDatasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); + + // 增量同步时只处理变更的数据集 + if (changedEntityIds != null) { + allDatasets = allDatasets.stream() + .filter(dataset -> changedEntityIds.contains(dataset.getId())) + .toList(); + } + + for (GraphEntity dataset : allDatasets) { + Object parentId = dataset.getProperties().get("parent_dataset_id"); + if (parentId == null || parentId.toString().isBlank()) { + continue; + } + String pairKey = parentId + "→" + dataset.getSourceId(); + processedPairs.add(pairKey); + mergeFieldImpacts(graphId, parentId.toString(), dataset.getSourceId(), + fieldsByDataset, null, result, syncId); + } + + // 3. Job 输入/输出推导:input dataset fields → output dataset fields + List allJobs = entityRepository.findByGraphIdAndType(graphId, "Job"); + + // 增量同步时只处理变更的作业 + if (changedEntityIds != null) { + allJobs = allJobs.stream() + .filter(job -> changedEntityIds.contains(job.getId())) + .toList(); + } + + for (GraphEntity job : allJobs) { + Object inputDsId = job.getProperties().get("input_dataset_id"); + Object outputDsId = job.getProperties().get("output_dataset_id"); + if (inputDsId == null || outputDsId == null + || inputDsId.toString().isBlank() || outputDsId.toString().isBlank()) { + continue; + } + String pairKey = inputDsId + "→" + outputDsId; + if (processedPairs.contains(pairKey)) { + continue; + } + processedPairs.add(pairKey); + mergeFieldImpacts(graphId, inputDsId.toString(), outputDsId.toString(), + fieldsByDataset, job.getSourceId(), result, syncId); + } + return endResult(result); } + /** + * 对两个关联 Dataset 的字段按名称匹配,创建 IMPACTS 关系。 + */ + private void mergeFieldImpacts(String graphId, + String sourceDatasetSourceId, String targetDatasetSourceId, + Map> fieldsByDataset, + String jobSourceId, + SyncResult result, String syncId) { + List sourceFields = fieldsByDataset.getOrDefault(sourceDatasetSourceId, List.of()); + List targetFields = fieldsByDataset.getOrDefault(targetDatasetSourceId, List.of()); + + if (sourceFields.isEmpty() || targetFields.isEmpty()) { + return; + } + + // 目标字段按名称索引 + Map targetByName = targetFields.stream() + .filter(f -> f.getName() != null && !f.getName().isBlank()) + .collect(Collectors.toMap(GraphEntity::getName, f -> f, (a, b) -> a)); + + for (GraphEntity srcField : sourceFields) { + if (srcField.getName() == null || srcField.getName().isBlank()) { + continue; + } + GraphEntity tgtField = targetByName.get(srcField.getName()); + if (tgtField == null) { + continue; + } + try { + String propsJson = jobSourceId != null + ? "{\"impact_type\":\"DIRECT\",\"job_id\":\"" + sanitizePropertyValue(jobSourceId) + "\"}" + : "{\"impact_type\":\"DIRECT\"}"; + boolean created = mergeRelation(graphId, srcField.getId(), tgtField.getId(), + "IMPACTS", propsJson, syncId); + if (created) { + result.incrementCreated(); + } else { + result.incrementSkipped(); + } + } catch (Exception e) { + log.warn("[{}] Failed to merge IMPACTS: {} → {}", syncId, + srcField.getId(), tgtField.getId(), e); + result.addError("impacts:" + srcField.getId()); + } + } + } + /** * 构建 SOURCED_FROM 关系:KnowledgeSet → Dataset(通过 source_dataset_ids)。 */ @Transactional public SyncResult mergeSourcedFromRelations(String graphId, String syncId) { + return mergeSourcedFromRelations(graphId, syncId, null); + } + + @Transactional + public SyncResult mergeSourcedFromRelations(String graphId, String syncId, Set changedEntityIds) { SyncResult result = beginResult("SOURCED_FROM", syncId); Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); - for (GraphEntity ks : entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet")) { + List knowledgeSets = entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet"); + if (changedEntityIds != null) { + knowledgeSets = knowledgeSets.stream() + .filter(ks -> changedEntityIds.contains(ks.getId())) + .toList(); + } + + for (GraphEntity ks : knowledgeSets) { try { Object sourceIds = ks.getProperties().get("source_dataset_ids"); if (sourceIds == null) { @@ -847,7 +1106,8 @@ public class GraphSyncStepService { "ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " + " e.name = $name, e.description = $description, " + " e.created_at = datetime(), e.updated_at = datetime()" + extraSet + " " + - "ON MATCH SET e.name = $name, e.description = $description, " + + "ON MATCH SET e.name = CASE WHEN $name <> '' THEN $name ELSE e.name END, " + + " e.description = CASE WHEN $description <> '' THEN $description ELSE e.description END, " + " e.updated_at = datetime()" + extraSet + " " + "RETURN e.id = $newId AS isNew" ) @@ -871,6 +1131,16 @@ public class GraphSyncStepService { return key.replaceAll("[^a-zA-Z0-9_]", ""); } + /** + * 清理属性值用于 JSON 字符串拼接,转义双引号和反斜杠,防止 JSON 注入。 + */ + private static String sanitizePropertyValue(String value) { + if (value == null) { + return ""; + } + return value.replace("\\", "\\\\").replace("\"", "\\\""); + } + /** * 将 Java 值转换为 Neo4j 兼容的属性值。 *

@@ -925,6 +1195,7 @@ public class GraphSyncStepService { "MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " + "ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " + " r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " + + "ON MATCH SET r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " + "RETURN r.id AS relId" ) .bindAll(Map.of( diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncMetadata.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncMetadata.java index 69ce1c9..3ab7e50 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncMetadata.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncMetadata.java @@ -33,6 +33,7 @@ public class SyncMetadata { public static final String STATUS_PARTIAL = "PARTIAL"; public static final String TYPE_FULL = "FULL"; + public static final String TYPE_INCREMENTAL = "INCREMENTAL"; public static final String TYPE_DATASETS = "DATASETS"; public static final String TYPE_FIELDS = "FIELDS"; public static final String TYPE_USERS = "USERS"; diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphRelationRepository.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphRelationRepository.java index 8b9578b..84606cc 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphRelationRepository.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphRelationRepository.java @@ -345,16 +345,13 @@ public class GraphRelationRepository { .query( "MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " + "MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " + - "CREATE (s)-[r:" + REL_TYPE + " {" + - " id: $id," + - " relation_type: $relationType," + - " weight: $weight," + - " confidence: $confidence," + - " source_id: $sourceId," + - " graph_id: $graphId," + - " properties_json: $propertiesJson," + - " created_at: $createdAt" + - "}]->(t) " + + "MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " + + "ON CREATE SET r.id = $id, r.weight = $weight, r.confidence = $confidence, " + + " r.source_id = $sourceId, r.properties_json = $propertiesJson, r.created_at = $createdAt " + + "ON MATCH SET r.weight = CASE WHEN $weight IS NOT NULL THEN $weight ELSE r.weight END, " + + " r.confidence = CASE WHEN $confidence IS NOT NULL THEN $confidence ELSE r.confidence END, " + + " r.source_id = CASE WHEN $sourceId <> '' THEN $sourceId ELSE r.source_id END, " + + " r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " + RETURN_COLUMNS ) .bindAll(params) diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java index 62516ab..0d19fd0 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java @@ -49,6 +49,18 @@ public class GraphSyncController { return SyncMetadataVO.from(metadata); } + /** + * 增量同步:仅拉取指定时间窗口内变更的数据并同步。 + */ + @PostMapping("/incremental") + public SyncMetadataVO syncIncremental( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId, + @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedFrom, + @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedTo) { + SyncMetadata metadata = syncService.syncIncremental(graphId, updatedFrom, updatedTo); + return SyncMetadataVO.from(metadata); + } + /** * 同步数据集实体。 */ diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java index 7b6c490..2ae4941 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java @@ -566,6 +566,163 @@ class GraphSyncServiceTest { } } + // ----------------------------------------------------------------------- + // 增量同步 + // ----------------------------------------------------------------------- + + @Nested + class IncrementalSyncTest { + + private final LocalDateTime UPDATED_FROM = LocalDateTime.of(2025, 6, 1, 0, 0); + private final LocalDateTime UPDATED_TO = LocalDateTime.of(2025, 6, 30, 23, 59); + + @Test + void syncIncremental_invalidGraphId_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncIncremental(INVALID_GRAPH_ID, UPDATED_FROM, UPDATED_TO)) + .isInstanceOf(BusinessException.class); + } + + @Test + void syncIncremental_nullUpdatedFrom_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, null, UPDATED_TO)) + .isInstanceOf(BusinessException.class) + .hasMessageContaining("updatedFrom"); + } + + @Test + void syncIncremental_nullUpdatedTo_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, null)) + .isInstanceOf(BusinessException.class) + .hasMessageContaining("updatedTo"); + } + + @Test + void syncIncremental_fromAfterTo_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_TO, UPDATED_FROM)) + .isInstanceOf(BusinessException.class) + .hasMessageContaining("updatedFrom"); + } + + @Test + void syncIncremental_success_passesTimeWindowToClient() { + when(properties.getSync()).thenReturn(syncConfig); + + DatasetDTO dto = new DatasetDTO(); + dto.setId("ds-001"); + dto.setName("Test"); + dto.setCreatedBy("admin"); + when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of(dto)); + when(dataManagementClient.listAllWorkflows(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of()); + when(dataManagementClient.listAllJobs(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of()); + when(dataManagementClient.listAllLabelTasks(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of()); + when(dataManagementClient.listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of()); + + stubAllEntityUpserts(); + stubAllRelationMerges(); + + SyncMetadata metadata = syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO); + + assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL); + assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS); + assertThat(metadata.getUpdatedFrom()).isEqualTo(UPDATED_FROM); + assertThat(metadata.getUpdatedTo()).isEqualTo(UPDATED_TO); + assertThat(metadata.getResults()).hasSize(18); + + // 验证使用了带时间窗口的 client 方法 + verify(dataManagementClient).listAllDatasets(UPDATED_FROM, UPDATED_TO); + verify(dataManagementClient).listAllWorkflows(UPDATED_FROM, UPDATED_TO); + verify(dataManagementClient).listAllJobs(UPDATED_FROM, UPDATED_TO); + verify(dataManagementClient).listAllLabelTasks(UPDATED_FROM, UPDATED_TO); + verify(dataManagementClient).listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO); + + // 验证不执行 purge + verify(stepService, never()).purgeStaleEntities(anyString(), anyString(), anySet(), anyString()); + } + + @Test + void syncIncremental_failure_recordsMetadataWithTimeWindow() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO)) + .thenThrow(new RuntimeException("connection refused")); + + assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO)) + .isInstanceOf(BusinessException.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass(SyncMetadata.class); + verify(syncHistoryRepository).save(captor.capture()); + SyncMetadata saved = captor.getValue(); + assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED); + assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL); + assertThat(saved.getUpdatedFrom()).isEqualTo(UPDATED_FROM); + assertThat(saved.getUpdatedTo()).isEqualTo(UPDATED_TO); + } + + private void stubAllEntityUpserts() { + when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Dataset").build()); + when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Field").build()); + when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) + .thenReturn(SyncResult.builder().syncType("User").build()); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("Org").build()); + when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Workflow").build()); + when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Job").build()); + when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("LabelTask").build()); + when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("KnowledgeSet").build()); + } + + private void stubAllRelationMerges() { + // 2-参数版本(全量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误 + lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); + lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); + lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); + lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); + lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("PRODUCES").build()); + lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build()); + lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("TRIGGERS").build()); + lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build()); + lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("IMPACTS").build()); + lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); + + // 3-参数版本(增量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误 + lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); + lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); + lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); + lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); + lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("PRODUCES").build()); + lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build()); + lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("TRIGGERS").build()); + lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build()); + lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("IMPACTS").build()); + lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString(), any())) + .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); + } + } + // ----------------------------------------------------------------------- // 同步历史查询 // ----------------------------------------------------------------------- diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java index efb843a..b540b0f 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java @@ -749,14 +749,129 @@ class GraphSyncStepServiceTest { } @Test - void mergeImpacts_returnsPlaceholderResult() { + void mergeImpacts_noFields_returnsZero() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")).thenReturn(List.of()); + SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID); assertThat(result.getSyncType()).isEqualTo("IMPACTS"); assertThat(result.getCreated()).isEqualTo(0); - assertThat(result.isPlaceholder()).isTrue(); + assertThat(result.isPlaceholder()).isFalse(); verifyNoInteractions(neo4jClient); - verifyNoInteractions(entityRepository); + } + + @Test + void mergeImpacts_derivedFrom_matchingFieldNames_createsRelation() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + // Parent dataset (source_id = "ds-parent") + GraphEntity parentDs = GraphEntity.builder() + .id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>()) + .build(); + // Child dataset (source_id = "ds-child", parent_dataset_id = "ds-parent") + GraphEntity childDs = GraphEntity.builder() + .id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent"))) + .build(); + + // Fields with matching name "user_id" + GraphEntity parentField = GraphEntity.builder() + .id("field-parent-uid").name("user_id").type("Field").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent"))) + .build(); + GraphEntity childField = GraphEntity.builder() + .id("field-child-uid").name("user_id").type("Field").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("dataset_source_id", "ds-child"))) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")) + .thenReturn(List.of(parentField, childField)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) + .thenReturn(List.of(parentDs, childDs)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")) + .thenReturn(List.of()); + + SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("IMPACTS"); + verify(neo4jClient).query(cypherCaptor.capture()); + assertThat(cypherCaptor.getValue()).contains("RELATED_TO"); + } + + @Test + void mergeImpacts_noMatchingFieldNames_createsNoRelation() { + GraphEntity parentDs = GraphEntity.builder() + .id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>()) + .build(); + GraphEntity childDs = GraphEntity.builder() + .id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent"))) + .build(); + + GraphEntity parentField = GraphEntity.builder() + .id("field-parent").name("col_a").type("Field").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent"))) + .build(); + GraphEntity childField = GraphEntity.builder() + .id("field-child").name("col_b").type("Field").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("dataset_source_id", "ds-child"))) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")) + .thenReturn(List.of(parentField, childField)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) + .thenReturn(List.of(parentDs, childDs)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")) + .thenReturn(List.of()); + + SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(0); + verifyNoInteractions(neo4jClient); + } + + @Test + void mergeImpacts_jobInputOutput_createsRelationWithJobId() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity inputDs = GraphEntity.builder() + .id("input-entity").sourceId("ds-in").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>()) + .build(); + GraphEntity outputDs = GraphEntity.builder() + .id("output-entity").sourceId("ds-out").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>()) + .build(); + GraphEntity job = GraphEntity.builder() + .id("job-entity").sourceId("job-001").type("Job").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of( + "input_dataset_id", "ds-in", + "output_dataset_id", "ds-out"))) + .build(); + + GraphEntity inField = GraphEntity.builder() + .id("field-in").name("tag_x").type("Field").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("dataset_source_id", "ds-in"))) + .build(); + GraphEntity outField = GraphEntity.builder() + .id("field-out").name("tag_x").type("Field").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("dataset_source_id", "ds-out"))) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")) + .thenReturn(List.of(inField, outField)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) + .thenReturn(List.of(inputDs, outputDs)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")) + .thenReturn(List.of(job)); + + SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("IMPACTS"); + verify(neo4jClient).query(cypherCaptor.capture()); + assertThat(cypherCaptor.getValue()).contains("RELATED_TO"); } @Test