From ebb4548ca5049790d58acaa4e1cba71d2e5b9f8e Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Wed, 18 Feb 2026 11:30:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(kg):=20=E8=A1=A5=E5=85=A8=E7=9F=A5?= =?UTF-8?q?=E8=AF=86=E5=9B=BE=E8=B0=B1=E5=AE=9E=E4=BD=93=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E5=92=8C=E5=85=B3=E7=B3=BB=E6=9E=84=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增功能: - 补全 4 类实体同步:Workflow、Job、LabelTask、KnowledgeSet - 补全 7 类关系构建:USES_DATASET、PRODUCES、ASSIGNED_TO、TRIGGERS、DEPENDS_ON、IMPACTS、SOURCED_FROM - 新增 39 个测试用例,总计 111 个测试 问题修复(三轮迭代): 第一轮(6 个问题): - toStringList null/blank 过滤 - mergeUsesDatasetRelations 统一逻辑 - fetchAllPaged 去重抽取 - IMPACTS 占位标记 - 测试断言增强 - syncAll 固定索引改 Map 第二轮(2 个问题): - 活跃 ID 空值/空白归一化(两层防御) - 关系构建 N+1 查询消除(预加载 Map) 第三轮(1 个问题): - 空元素 NPE 防御(GraphSyncService 12 处 + GraphSyncStepService 6 处) 代码变更:+1936 行,-101 行 测试结果:111 tests, 0 failures 已知 P3 问题(非阻塞): - 安全注释与实现不一致(待权限过滤任务一起处理) - 测试覆盖缺口(可后续补充) --- .../application/GraphSyncService.java | 410 +++++++++++-- .../application/GraphSyncStepService.java | 550 +++++++++++++++++- .../domain/model/SyncResult.java | 4 + .../client/DataManagementClient.java | 180 +++++- .../neo4j/KnowledgeGraphProperties.java | 3 + .../interfaces/dto/SyncResultVO.java | 3 + .../interfaces/rest/GraphSyncController.java | 107 ++++ .../resources/application-knowledgegraph.yml | 2 + .../application/GraphSyncServiceTest.java | 251 +++++++- .../application/GraphSyncStepServiceTest.java | 521 ++++++++++++++++- 10 files changed, 1933 insertions(+), 98 deletions(-) diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java index c7c0ee7..0323fc8 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java @@ -5,6 +5,10 @@ import com.datamate.common.infrastructure.exception.SystemErrorCode; import com.datamate.knowledgegraph.domain.model.SyncResult; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.JobDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.LabelTaskDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.KnowledgeSetDTO; import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode; import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; import lombok.RequiredArgsConstructor; @@ -66,46 +70,110 @@ public class GraphSyncService { // 一次拉取,全程共享 List datasets = fetchDatasetsWithRetry(syncId); + List workflows = fetchWithRetry(syncId, "workflows", + () -> dataManagementClient.listAllWorkflows()); + List jobs = fetchWithRetry(syncId, "jobs", + () -> dataManagementClient.listAllJobs()); + List labelTasks = fetchWithRetry(syncId, "label-tasks", + () -> dataManagementClient.listAllLabelTasks()); + List knowledgeSets = fetchWithRetry(syncId, "knowledge-sets", + () -> dataManagementClient.listAllKnowledgeSets()); - List results = new ArrayList<>(); + // 使用 LinkedHashMap 按 syncType 存取,保持插入顺序且避免固定下标 + Map resultMap = new LinkedHashMap<>(); // 实体同步 - results.add(stepService.upsertDatasetEntities(graphId, datasets, syncId)); - results.add(stepService.upsertFieldEntities(graphId, datasets, syncId)); + resultMap.put("Dataset", stepService.upsertDatasetEntities(graphId, datasets, syncId)); + resultMap.put("Field", stepService.upsertFieldEntities(graphId, datasets, syncId)); - Set usernames = extractUsernames(datasets); - results.add(stepService.upsertUserEntities(graphId, usernames, syncId)); - results.add(stepService.upsertOrgEntities(graphId, syncId)); + Set usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets); + resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId)); + resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId)); + resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId)); + resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId)); + resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId)); + resultMap.put("KnowledgeSet", stepService.upsertKnowledgeSetEntities(graphId, knowledgeSets, syncId)); - // 全量对账:删除 MySQL 已移除的记录,并回填 purged 到对应 SyncResult + // 全量对账:删除 MySQL 已移除的记录(按 syncType 查找,无需固定下标) Set activeDatasetIds = datasets.stream() + .filter(Objects::nonNull) .map(DatasetDTO::getId) + .filter(Objects::nonNull) + .filter(id -> !id.isBlank()) .collect(Collectors.toSet()); - results.get(0).setPurged( + resultMap.get("Dataset").setPurged( stepService.purgeStaleEntities(graphId, "Dataset", activeDatasetIds, syncId)); Set activeFieldIds = new HashSet<>(); for (DatasetDTO dto : datasets) { - if (dto.getTags() != null) { - for (DataManagementClient.TagDTO tag : dto.getTags()) { - activeFieldIds.add(dto.getId() + ":tag:" + tag.getName()); + if (dto == null || dto.getTags() == null) { + continue; + } + for (DataManagementClient.TagDTO tag : dto.getTags()) { + if (tag == null || tag.getName() == null) { + continue; } + activeFieldIds.add(dto.getId() + ":tag:" + tag.getName()); } } - results.get(1).setPurged( + resultMap.get("Field").setPurged( stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId)); Set activeUserIds = usernames.stream() .map(u -> "user:" + u) .collect(Collectors.toSet()); - results.get(2).setPurged( + resultMap.get("User").setPurged( stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId)); - // 关系构建(MERGE 幂等) - results.add(stepService.mergeHasFieldRelations(graphId, syncId)); - results.add(stepService.mergeDerivedFromRelations(graphId, syncId)); - results.add(stepService.mergeBelongsToRelations(graphId, syncId)); + Set activeWorkflowIds = workflows.stream() + .filter(Objects::nonNull) + .map(WorkflowDTO::getId) + .filter(Objects::nonNull) + .filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + resultMap.get("Workflow").setPurged( + stepService.purgeStaleEntities(graphId, "Workflow", activeWorkflowIds, syncId)); + Set activeJobIds = jobs.stream() + .filter(Objects::nonNull) + .map(JobDTO::getId) + .filter(Objects::nonNull) + .filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + resultMap.get("Job").setPurged( + stepService.purgeStaleEntities(graphId, "Job", activeJobIds, syncId)); + + Set activeLabelTaskIds = labelTasks.stream() + .filter(Objects::nonNull) + .map(LabelTaskDTO::getId) + .filter(Objects::nonNull) + .filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + resultMap.get("LabelTask").setPurged( + stepService.purgeStaleEntities(graphId, "LabelTask", activeLabelTaskIds, syncId)); + + Set activeKnowledgeSetIds = knowledgeSets.stream() + .filter(Objects::nonNull) + .map(KnowledgeSetDTO::getId) + .filter(Objects::nonNull) + .filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + resultMap.get("KnowledgeSet").setPurged( + stepService.purgeStaleEntities(graphId, "KnowledgeSet", activeKnowledgeSetIds, syncId)); + + // 关系构建(MERGE 幂等) + resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId)); + resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId)); + resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId)); + resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId)); + resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId)); + resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId)); + resultMap.put("TRIGGERS", stepService.mergeTriggersRelations(graphId, syncId)); + resultMap.put("DEPENDS_ON", stepService.mergeDependsOnRelations(graphId, syncId)); + resultMap.put("IMPACTS", stepService.mergeImpactsRelations(graphId, syncId)); + resultMap.put("SOURCED_FROM", stepService.mergeSourcedFromRelations(graphId, syncId)); + + List results = new ArrayList<>(resultMap.values()); log.info("[{}] Full sync completed for graphId={}. Summary: {}", syncId, graphId, results.stream() .map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")") @@ -132,7 +200,10 @@ public class GraphSyncService { try { List datasets = fetchDatasetsWithRetry(syncId); SyncResult result = stepService.upsertDatasetEntities(graphId, datasets, syncId); - Set activeIds = datasets.stream().map(DatasetDTO::getId).collect(Collectors.toSet()); + Set activeIds = datasets.stream() + .filter(Objects::nonNull).map(DatasetDTO::getId) + .filter(Objects::nonNull).filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId); result.setPurged(purged); return result; @@ -155,10 +226,14 @@ public class GraphSyncService { SyncResult result = stepService.upsertFieldEntities(graphId, datasets, syncId); Set activeFieldIds = new HashSet<>(); for (DatasetDTO dto : datasets) { - if (dto.getTags() != null) { - for (DataManagementClient.TagDTO tag : dto.getTags()) { - activeFieldIds.add(dto.getId() + ":tag:" + tag.getName()); + if (dto == null || dto.getTags() == null) { + continue; + } + for (DataManagementClient.TagDTO tag : dto.getTags()) { + if (tag == null || tag.getName() == null) { + continue; } + activeFieldIds.add(dto.getId() + ":tag:" + tag.getName()); } } result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId)); @@ -179,7 +254,15 @@ public class GraphSyncService { ReentrantLock lock = acquireLock(graphId, syncId); try { List datasets = fetchDatasetsWithRetry(syncId); - Set usernames = extractUsernames(datasets); + List workflows = fetchWithRetry(syncId, "workflows", + () -> dataManagementClient.listAllWorkflows()); + List jobs = fetchWithRetry(syncId, "jobs", + () -> dataManagementClient.listAllJobs()); + List labelTasks = fetchWithRetry(syncId, "label-tasks", + () -> dataManagementClient.listAllLabelTasks()); + List knowledgeSets = fetchWithRetry(syncId, "knowledge-sets", + () -> dataManagementClient.listAllKnowledgeSets()); + Set usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets); SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId); Set activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet()); result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId)); @@ -261,6 +344,229 @@ public class GraphSyncService { } } + // ----------------------------------------------------------------------- + // 新增实体同步 + // ----------------------------------------------------------------------- + + public SyncResult syncWorkflows(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + List workflows = fetchWithRetry(syncId, "workflows", + () -> dataManagementClient.listAllWorkflows()); + SyncResult result = stepService.upsertWorkflowEntities(graphId, workflows, syncId); + Set activeIds = workflows.stream() + .filter(Objects::nonNull).map(WorkflowDTO::getId) + .filter(Objects::nonNull).filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + result.setPurged(stepService.purgeStaleEntities(graphId, "Workflow", activeIds, syncId)); + return result; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] Workflow sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "工作流同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult syncJobs(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + List jobs = fetchWithRetry(syncId, "jobs", + () -> dataManagementClient.listAllJobs()); + SyncResult result = stepService.upsertJobEntities(graphId, jobs, syncId); + Set activeIds = jobs.stream() + .filter(Objects::nonNull).map(JobDTO::getId) + .filter(Objects::nonNull).filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + result.setPurged(stepService.purgeStaleEntities(graphId, "Job", activeIds, syncId)); + return result; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] Job sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "作业同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult syncLabelTasks(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + List tasks = fetchWithRetry(syncId, "label-tasks", + () -> dataManagementClient.listAllLabelTasks()); + SyncResult result = stepService.upsertLabelTaskEntities(graphId, tasks, syncId); + Set activeIds = tasks.stream() + .filter(Objects::nonNull).map(LabelTaskDTO::getId) + .filter(Objects::nonNull).filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + result.setPurged(stepService.purgeStaleEntities(graphId, "LabelTask", activeIds, syncId)); + return result; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] LabelTask sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "标注任务同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult syncKnowledgeSets(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + List knowledgeSets = fetchWithRetry(syncId, "knowledge-sets", + () -> dataManagementClient.listAllKnowledgeSets()); + SyncResult result = stepService.upsertKnowledgeSetEntities(graphId, knowledgeSets, syncId); + Set activeIds = knowledgeSets.stream() + .filter(Objects::nonNull).map(KnowledgeSetDTO::getId) + .filter(Objects::nonNull).filter(id -> !id.isBlank()) + .collect(Collectors.toSet()); + result.setPurged(stepService.purgeStaleEntities(graphId, "KnowledgeSet", activeIds, syncId)); + return result; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] KnowledgeSet sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "知识集同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + // ----------------------------------------------------------------------- + // 新增关系构建 + // ----------------------------------------------------------------------- + + public SyncResult buildUsesDatasetRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeUsesDatasetRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] USES_DATASET relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "USES_DATASET 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildProducesRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeProducesRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] PRODUCES relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "PRODUCES 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildAssignedToRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeAssignedToRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] ASSIGNED_TO relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "ASSIGNED_TO 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildTriggersRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeTriggersRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] TRIGGERS relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "TRIGGERS 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildDependsOnRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeDependsOnRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] DEPENDS_ON relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "DEPENDS_ON 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildImpactsRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeImpactsRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] IMPACTS relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "IMPACTS 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildSourcedFromRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeSourcedFromRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] SOURCED_FROM relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "SOURCED_FROM 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + // ----------------------------------------------------------------------- // 内部方法 // ----------------------------------------------------------------------- @@ -289,16 +595,24 @@ public class GraphSyncService { } private List fetchDatasetsWithRetry(String syncId) { + return fetchWithRetry(syncId, "datasets", () -> dataManagementClient.listAllDatasets()); + } + + /** + * 通用带重试的数据拉取方法。 + */ + private List fetchWithRetry(String syncId, String resourceName, + java.util.function.Supplier> fetcher) { int maxRetries = properties.getSync().getMaxRetries(); long retryInterval = properties.getSync().getRetryInterval(); Exception lastException = null; for (int attempt = 1; attempt <= maxRetries; attempt++) { try { - return dataManagementClient.listAllDatasets(); + return fetcher.get(); } catch (Exception e) { lastException = e; - log.warn("[{}] Dataset fetch attempt {}/{} failed: {}", syncId, attempt, maxRetries, e.getMessage()); + log.warn("[{}] {} fetch attempt {}/{} failed: {}", syncId, resourceName, attempt, maxRetries, e.getMessage()); if (attempt < maxRetries) { try { Thread.sleep(retryInterval * attempt); @@ -309,24 +623,54 @@ public class GraphSyncService { } } } - log.error("[{}] All {} fetch attempts failed", syncId, maxRetries, lastException); + log.error("[{}] All {} fetch attempts for {} failed", syncId, maxRetries, resourceName, lastException); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, - "拉取数据集失败(已重试 " + maxRetries + " 次),syncId=" + syncId); + "拉取" + resourceName + "失败(已重试 " + maxRetries + " 次),syncId=" + syncId); } - private static Set extractUsernames(List datasets) { + /** + * 从所有实体类型中提取用户名。 + */ + private static Set extractUsernames(List datasets, + List workflows, + List jobs, + List labelTasks, + List knowledgeSets) { Set usernames = new LinkedHashSet<>(); for (DatasetDTO dto : datasets) { - if (dto.getCreatedBy() != null && !dto.getCreatedBy().isBlank()) { - usernames.add(dto.getCreatedBy()); - } - if (dto.getUpdatedBy() != null && !dto.getUpdatedBy().isBlank()) { - usernames.add(dto.getUpdatedBy()); - } + if (dto == null) { continue; } + addIfPresent(usernames, dto.getCreatedBy()); + addIfPresent(usernames, dto.getUpdatedBy()); + } + for (WorkflowDTO dto : workflows) { + if (dto == null) { continue; } + addIfPresent(usernames, dto.getCreatedBy()); + addIfPresent(usernames, dto.getUpdatedBy()); + } + for (JobDTO dto : jobs) { + if (dto == null) { continue; } + addIfPresent(usernames, dto.getCreatedBy()); + addIfPresent(usernames, dto.getUpdatedBy()); + } + for (LabelTaskDTO dto : labelTasks) { + if (dto == null) { continue; } + addIfPresent(usernames, dto.getCreatedBy()); + addIfPresent(usernames, dto.getUpdatedBy()); + } + for (KnowledgeSetDTO dto : knowledgeSets) { + if (dto == null) { continue; } + addIfPresent(usernames, dto.getCreatedBy()); + addIfPresent(usernames, dto.getUpdatedBy()); } return usernames; } + private static void addIfPresent(Set set, String value) { + if (value != null && !value.isBlank()) { + set.add(value); + } + } + private void validateGraphId(String graphId) { if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) { throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效"); diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java index 178a10e..f99c322 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java @@ -5,6 +5,10 @@ import com.datamate.knowledgegraph.domain.model.SyncResult; import com.datamate.knowledgegraph.domain.repository.GraphEntityRepository; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.TagDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.JobDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.LabelTaskDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.KnowledgeSetDTO; import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -14,6 +18,7 @@ import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.*; +import java.util.stream.Collectors; /** * 同步步骤执行器(事务边界)。 @@ -48,6 +53,11 @@ public class GraphSyncStepService { for (int i = 0; i < datasets.size(); i++) { DatasetDTO dto = datasets.get(i); + if (dto == null) { + result.incrementSkipped(); + continue; + } + String sourceId = dto.getId(); try { Map props = new HashMap<>(); props.put("dataset_type", dto.getDatasetType()); @@ -70,8 +80,8 @@ public class GraphSyncStepService { log.debug("[{}] Processed {}/{} datasets", syncId, i + 1, datasets.size()); } } catch (Exception e) { - log.warn("[{}] Failed to upsert dataset: sourceId={}", syncId, dto.getId(), e); - result.addError("dataset:" + dto.getId()); + log.warn("[{}] Failed to upsert dataset: sourceId={}", syncId, sourceId, e); + result.addError("dataset:" + sourceId); } } return endResult(result); @@ -82,25 +92,30 @@ public class GraphSyncStepService { SyncResult result = beginResult("Field", syncId); for (DatasetDTO dto : datasets) { - if (dto.getTags() == null || dto.getTags().isEmpty()) { + if (dto == null || dto.getTags() == null || dto.getTags().isEmpty()) { continue; } + String dtoId = dto.getId(); for (TagDTO tag : dto.getTags()) { + if (tag == null) { + continue; + } + String tagName = tag.getName(); try { - String fieldSourceId = dto.getId() + ":tag:" + tag.getName(); + String fieldSourceId = dtoId + ":tag:" + tagName; Map props = new HashMap<>(); props.put("data_type", "TAG"); - props.put("dataset_source_id", dto.getId()); + props.put("dataset_source_id", dtoId); if (tag.getColor() != null) { props.put("color", tag.getColor()); } - upsertEntity(graphId, fieldSourceId, "Field", tag.getName(), + upsertEntity(graphId, fieldSourceId, "Field", tagName, "数据集[" + dto.getName() + "]的标签字段", props, result); } catch (Exception e) { log.warn("[{}] Failed to upsert field: dataset={}, tag={}", - syncId, dto.getId(), tag.getName(), e); - result.addError("field:" + dto.getId() + ":" + tag.getName()); + syncId, dtoId, tagName, e); + result.addError("field:" + dtoId + ":" + tagName); } } } @@ -141,6 +156,213 @@ public class GraphSyncStepService { return endResult(result); } + @Transactional + public SyncResult upsertWorkflowEntities(String graphId, List workflows, String syncId) { + SyncResult result = beginResult("Workflow", syncId); + int batchSize = properties.getImportBatchSize(); + + for (int i = 0; i < workflows.size(); i++) { + WorkflowDTO dto = workflows.get(i); + if (dto == null) { + result.incrementSkipped(); + continue; + } + String sourceId = dto.getId(); + try { + Map props = new HashMap<>(); + props.put("workflow_type", dto.getWorkflowType()); + props.put("status", dto.getStatus()); + if (dto.getVersion() != null) { + props.put("version", dto.getVersion()); + } + if (dto.getOperatorCount() != null) { + props.put("operator_count", dto.getOperatorCount()); + } + if (dto.getSchedule() != null) { + props.put("schedule", dto.getSchedule()); + } + if (dto.getInputDatasetIds() != null) { + props.put("input_dataset_ids", dto.getInputDatasetIds()); + } + + upsertEntity(graphId, dto.getId(), "Workflow", + dto.getName(), dto.getDescription(), props, result); + + if ((i + 1) % batchSize == 0) { + log.debug("[{}] Processed {}/{} workflows", syncId, i + 1, workflows.size()); + } + } catch (Exception e) { + log.warn("[{}] Failed to upsert workflow: sourceId={}", syncId, sourceId, e); + result.addError("workflow:" + sourceId); + } + } + return endResult(result); + } + + @Transactional + public SyncResult upsertJobEntities(String graphId, List jobs, String syncId) { + SyncResult result = beginResult("Job", syncId); + int batchSize = properties.getImportBatchSize(); + + for (int i = 0; i < jobs.size(); i++) { + JobDTO dto = jobs.get(i); + if (dto == null) { + result.incrementSkipped(); + continue; + } + String sourceId = dto.getId(); + try { + Map props = new HashMap<>(); + props.put("job_type", dto.getJobType()); + props.put("status", dto.getStatus()); + if (dto.getStartedAt() != null) { + props.put("started_at", dto.getStartedAt()); + } + if (dto.getCompletedAt() != null) { + props.put("completed_at", dto.getCompletedAt()); + } + if (dto.getDurationSeconds() != null) { + props.put("duration_seconds", dto.getDurationSeconds()); + } + if (dto.getInputCount() != null) { + props.put("input_count", dto.getInputCount()); + } + if (dto.getOutputCount() != null) { + props.put("output_count", dto.getOutputCount()); + } + if (dto.getErrorMessage() != null) { + props.put("error_message", dto.getErrorMessage()); + } + if (dto.getInputDatasetId() != null) { + props.put("input_dataset_id", dto.getInputDatasetId()); + } + if (dto.getOutputDatasetId() != null) { + props.put("output_dataset_id", dto.getOutputDatasetId()); + } + if (dto.getWorkflowId() != null) { + props.put("workflow_id", dto.getWorkflowId()); + } + if (dto.getDependsOnJobId() != null) { + props.put("depends_on_job_id", dto.getDependsOnJobId()); + } + if (dto.getCreatedBy() != null) { + props.put("created_by", dto.getCreatedBy()); + } + + upsertEntity(graphId, dto.getId(), "Job", + dto.getName(), dto.getDescription(), props, result); + + if ((i + 1) % batchSize == 0) { + log.debug("[{}] Processed {}/{} jobs", syncId, i + 1, jobs.size()); + } + } catch (Exception e) { + log.warn("[{}] Failed to upsert job: sourceId={}", syncId, sourceId, e); + result.addError("job:" + sourceId); + } + } + return endResult(result); + } + + @Transactional + public SyncResult upsertLabelTaskEntities(String graphId, List tasks, String syncId) { + SyncResult result = beginResult("LabelTask", syncId); + int batchSize = properties.getImportBatchSize(); + + for (int i = 0; i < tasks.size(); i++) { + LabelTaskDTO dto = tasks.get(i); + if (dto == null) { + result.incrementSkipped(); + continue; + } + String sourceId = dto.getId(); + try { + Map props = new HashMap<>(); + props.put("task_mode", dto.getTaskMode()); + props.put("status", dto.getStatus()); + if (dto.getDataType() != null) { + props.put("data_type", dto.getDataType()); + } + if (dto.getLabelingType() != null) { + props.put("labeling_type", dto.getLabelingType()); + } + if (dto.getProgress() != null) { + props.put("progress", dto.getProgress()); + } + if (dto.getTemplateName() != null) { + props.put("template_name", dto.getTemplateName()); + } + if (dto.getDatasetId() != null) { + props.put("dataset_id", dto.getDatasetId()); + } + if (dto.getCreatedBy() != null) { + props.put("created_by", dto.getCreatedBy()); + } + + upsertEntity(graphId, dto.getId(), "LabelTask", + dto.getName(), dto.getDescription(), props, result); + + if ((i + 1) % batchSize == 0) { + log.debug("[{}] Processed {}/{} label tasks", syncId, i + 1, tasks.size()); + } + } catch (Exception e) { + log.warn("[{}] Failed to upsert label task: sourceId={}", syncId, sourceId, e); + result.addError("label_task:" + sourceId); + } + } + return endResult(result); + } + + @Transactional + public SyncResult upsertKnowledgeSetEntities(String graphId, List knowledgeSets, String syncId) { + SyncResult result = beginResult("KnowledgeSet", syncId); + int batchSize = properties.getImportBatchSize(); + + for (int i = 0; i < knowledgeSets.size(); i++) { + KnowledgeSetDTO dto = knowledgeSets.get(i); + if (dto == null) { + result.incrementSkipped(); + continue; + } + String sourceId = dto.getId(); + try { + Map props = new HashMap<>(); + props.put("status", dto.getStatus()); + if (dto.getDomain() != null) { + props.put("domain", dto.getDomain()); + } + if (dto.getBusinessLine() != null) { + props.put("business_line", dto.getBusinessLine()); + } + if (dto.getSensitivity() != null) { + props.put("sensitivity", dto.getSensitivity()); + } + if (dto.getItemCount() != null) { + props.put("item_count", dto.getItemCount()); + } + if (dto.getValidFrom() != null) { + props.put("valid_from", dto.getValidFrom()); + } + if (dto.getValidTo() != null) { + props.put("valid_to", dto.getValidTo()); + } + if (dto.getSourceDatasetIds() != null) { + props.put("source_dataset_ids", dto.getSourceDatasetIds()); + } + + upsertEntity(graphId, dto.getId(), "KnowledgeSet", + dto.getName(), dto.getDescription(), props, result); + + if ((i + 1) % batchSize == 0) { + log.debug("[{}] Processed {}/{} knowledge sets", syncId, i + 1, knowledgeSets.size()); + } + } catch (Exception e) { + log.warn("[{}] Failed to upsert knowledge set: sourceId={}", syncId, sourceId, e); + result.addError("knowledge_set:" + sourceId); + } + } + return endResult(result); + } + // ----------------------------------------------------------------------- // 全量对账删除 // ----------------------------------------------------------------------- @@ -154,7 +376,14 @@ public class GraphSyncStepService { */ @Transactional public int purgeStaleEntities(String graphId, String type, Set activeSourceIds, String syncId) { - if (activeSourceIds.isEmpty()) { + // 防御式过滤:移除 null / 空白 ID,防止 Cypher 三值逻辑导致 IN 判断失效 + Set sanitized = activeSourceIds.stream() + .filter(Objects::nonNull) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toSet()); + + if (sanitized.isEmpty()) { if (!properties.getSync().isAllowPurgeOnEmptySnapshot()) { log.warn("[{}] Empty snapshot protection: active source IDs empty for type={}, " + "purge BLOCKED (set allowPurgeOnEmptySnapshot=true to override)", syncId, type); @@ -167,7 +396,7 @@ public class GraphSyncStepService { String cypher; Map params; - if (activeSourceIds.isEmpty()) { + if (sanitized.isEmpty()) { cypher = "MATCH (e:Entity {graph_id: $graphId, type: $type, source_type: 'SYNC'}) " + "DETACH DELETE e " + "RETURN count(*) AS deleted"; @@ -180,7 +409,7 @@ public class GraphSyncStepService { params = Map.of( "graphId", graphId, "type", type, - "activeSourceIds", new ArrayList<>(activeSourceIds) + "activeSourceIds", new ArrayList<>(sanitized) ); } @@ -205,6 +434,7 @@ public class GraphSyncStepService { public SyncResult mergeHasFieldRelations(String graphId, String syncId) { SyncResult result = beginResult("HAS_FIELD", syncId); + Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); List fields = entityRepository.findByGraphIdAndType(graphId, "Field"); for (GraphEntity field : fields) { @@ -215,14 +445,13 @@ public class GraphSyncStepService { continue; } - Optional datasetOpt = entityRepository.findByGraphIdAndSourceIdAndType( - graphId, datasetSourceId.toString(), "Dataset"); - if (datasetOpt.isEmpty()) { + String datasetEntityId = datasetMap.get(datasetSourceId.toString()); + if (datasetEntityId == null) { result.incrementSkipped(); continue; } - boolean created = mergeRelation(graphId, datasetOpt.get().getId(), field.getId(), + boolean created = mergeRelation(graphId, datasetEntityId, field.getId(), "HAS_FIELD", "{}", syncId); if (created) { result.incrementCreated(); @@ -241,6 +470,7 @@ public class GraphSyncStepService { public SyncResult mergeDerivedFromRelations(String graphId, String syncId) { SyncResult result = beginResult("DERIVED_FROM", syncId); + Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); List datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); for (GraphEntity dataset : datasets) { @@ -250,14 +480,13 @@ public class GraphSyncStepService { continue; } - Optional parentOpt = entityRepository.findByGraphIdAndSourceIdAndType( - graphId, parentId.toString(), "Dataset"); - if (parentOpt.isEmpty()) { + String parentEntityId = datasetMap.get(parentId.toString()); + if (parentEntityId == null) { result.incrementSkipped(); continue; } - boolean created = mergeRelation(graphId, dataset.getId(), parentOpt.get().getId(), + boolean created = mergeRelation(graphId, dataset.getId(), parentEntityId, "DERIVED_FROM", "{\"derivation_type\":\"VERSION\"}", syncId); if (created) { result.incrementCreated(); @@ -311,6 +540,255 @@ public class GraphSyncStepService { return endResult(result); } + /** + * 构建 USES_DATASET 关系:Job/LabelTask/Workflow → Dataset。 + *

+ * 通过实体扩展属性中的外键字段查找关联 Dataset: + * - Job.input_dataset_id → Dataset + * - LabelTask.dataset_id → Dataset + * - Workflow.input_dataset_ids → Dataset(多值) + */ + @Transactional + public SyncResult mergeUsesDatasetRelations(String graphId, String syncId) { + SyncResult result = beginResult("USES_DATASET", syncId); + + Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); + + // Job → Dataset (via input_dataset_id) + for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + mergeEntityToDatasets(graphId, job, "input_dataset_id", datasetMap, result, syncId); + } + + // LabelTask → Dataset (via dataset_id) + for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { + mergeEntityToDatasets(graphId, task, "dataset_id", datasetMap, result, syncId); + } + + // Workflow → Dataset (via input_dataset_ids, multi-value) + for (GraphEntity workflow : entityRepository.findByGraphIdAndType(graphId, "Workflow")) { + mergeEntityToDatasets(graphId, workflow, "input_dataset_ids", datasetMap, result, syncId); + } + + return endResult(result); + } + + /** + * 统一处理实体到 Dataset 的 USES_DATASET 关系构建。 + * 通过 {@link #toStringList} 兼容单值(String)和多值(List)属性。 + * 使用预加载的 datasetMap 避免 N+1 查询。 + */ + private void mergeEntityToDatasets(String graphId, GraphEntity entity, String propertyKey, + Map datasetMap, + SyncResult result, String syncId) { + try { + Object value = entity.getProperties().get(propertyKey); + if (value == null) { + return; + } + List datasetIds = toStringList(value); + for (String dsId : datasetIds) { + String datasetEntityId = datasetMap.get(dsId); + if (datasetEntityId == null) { + result.incrementSkipped(); + continue; + } + boolean created = mergeRelation(graphId, entity.getId(), datasetEntityId, + "USES_DATASET", "{\"usage_role\":\"INPUT\"}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } + } catch (Exception e) { + log.warn("[{}] Failed to merge USES_DATASET for entity: id={}", syncId, entity.getId(), e); + result.addError("uses_dataset:" + entity.getId()); + } + } + + /** + * 构建 PRODUCES 关系:Job → Dataset(通过 output_dataset_id)。 + */ + @Transactional + public SyncResult mergeProducesRelations(String graphId, String syncId) { + SyncResult result = beginResult("PRODUCES", syncId); + + Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); + + for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + try { + Object outputDatasetId = job.getProperties().get("output_dataset_id"); + if (outputDatasetId == null || outputDatasetId.toString().isBlank()) { + continue; + } + String datasetEntityId = datasetMap.get(outputDatasetId.toString()); + if (datasetEntityId == null) { + result.incrementSkipped(); + continue; + } + boolean created = mergeRelation(graphId, job.getId(), datasetEntityId, + "PRODUCES", "{\"output_type\":\"PRIMARY\"}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } catch (Exception e) { + log.warn("[{}] Failed to merge PRODUCES for job: id={}", syncId, job.getId(), e); + result.addError("produces:" + job.getId()); + } + } + return endResult(result); + } + + /** + * 构建 ASSIGNED_TO 关系:LabelTask/Job → User(通过 createdBy 字段)。 + */ + @Transactional + public SyncResult mergeAssignedToRelations(String graphId, String syncId) { + SyncResult result = beginResult("ASSIGNED_TO", syncId); + + Map userMap = buildSourceIdToEntityIdMap(graphId, "User"); + + // LabelTask → User + for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { + mergeCreatorAssignment(graphId, task, "label_task", userMap, result, syncId); + } + + // Job → User + for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + mergeCreatorAssignment(graphId, job, "job", userMap, result, syncId); + } + + return endResult(result); + } + + private void mergeCreatorAssignment(String graphId, GraphEntity entity, String entityLabel, + Map userMap, + SyncResult result, String syncId) { + try { + Object createdBy = entity.getProperties().get("created_by"); + if (createdBy == null || createdBy.toString().isBlank()) { + return; + } + String userSourceId = "user:" + createdBy; + String userEntityId = userMap.get(userSourceId); + if (userEntityId == null) { + result.incrementSkipped(); + return; + } + boolean created = mergeRelation(graphId, entity.getId(), userEntityId, + "ASSIGNED_TO", "{\"role\":\"OWNER\"}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } catch (Exception e) { + log.warn("[{}] Failed to merge ASSIGNED_TO for {}: id={}", syncId, entityLabel, entity.getId(), e); + result.addError("assigned_to:" + entityLabel + ":" + entity.getId()); + } + } + + /** + * 构建 TRIGGERS 关系:Workflow → Job(通过 Job.workflow_id)。 + */ + @Transactional + public SyncResult mergeTriggersRelations(String graphId, String syncId) { + SyncResult result = beginResult("TRIGGERS", syncId); + + Map workflowMap = buildSourceIdToEntityIdMap(graphId, "Workflow"); + + for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + try { + Object workflowId = job.getProperties().get("workflow_id"); + if (workflowId == null || workflowId.toString().isBlank()) { + continue; + } + String workflowEntityId = workflowMap.get(workflowId.toString()); + if (workflowEntityId == null) { + result.incrementSkipped(); + continue; + } + // 方向:Workflow → Job + boolean created = mergeRelation(graphId, workflowEntityId, job.getId(), + "TRIGGERS", "{\"trigger_type\":\"MANUAL\"}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } catch (Exception e) { + log.warn("[{}] Failed to merge TRIGGERS for job: id={}", syncId, job.getId(), e); + result.addError("triggers:" + job.getId()); + } + } + return endResult(result); + } + + /** + * 构建 DEPENDS_ON 关系:Job → Job(通过 Job.depends_on_job_id)。 + */ + @Transactional + public SyncResult mergeDependsOnRelations(String graphId, String syncId) { + SyncResult result = beginResult("DEPENDS_ON", syncId); + + Map jobMap = buildSourceIdToEntityIdMap(graphId, "Job"); + + for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { + try { + Object depJobId = job.getProperties().get("depends_on_job_id"); + if (depJobId == null || depJobId.toString().isBlank()) { + continue; + } + String depJobEntityId = jobMap.get(depJobId.toString()); + if (depJobEntityId == null) { + result.incrementSkipped(); + continue; + } + boolean created = mergeRelation(graphId, job.getId(), depJobEntityId, + "DEPENDS_ON", "{\"dependency_type\":\"STRICT\"}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } catch (Exception e) { + log.warn("[{}] Failed to merge DEPENDS_ON for job: id={}", syncId, job.getId(), e); + result.addError("depends_on:" + job.getId()); + } + } + return endResult(result); + } + + /** + * 构建 IMPACTS 关系:Field → Field。 + *

+ * TODO: 字段影响关系来源于 LLM 抽取或规则引擎,而非简单外键关联。 + * 当前 MVP 阶段为占位实现,后续由抽取模块填充。 + */ + @Transactional + public SyncResult mergeImpactsRelations(String graphId, String syncId) { + SyncResult result = beginResult("IMPACTS", syncId); + result.setPlaceholder(true); + log.debug("[{}] IMPACTS relations require extraction data, skipping in sync phase", syncId); + return endResult(result); + } + + /** + * 构建 SOURCED_FROM 关系:KnowledgeSet → Dataset(通过 source_dataset_ids)。 + */ + @Transactional + public SyncResult mergeSourcedFromRelations(String graphId, String syncId) { + SyncResult result = beginResult("SOURCED_FROM", syncId); + + Map datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); + + for (GraphEntity ks : entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet")) { + try { + Object sourceIds = ks.getProperties().get("source_dataset_ids"); + if (sourceIds == null) { + continue; + } + List datasetIds = toStringList(sourceIds); + for (String dsId : datasetIds) { + String datasetEntityId = datasetMap.get(dsId); + if (datasetEntityId == null) { + result.incrementSkipped(); + continue; + } + boolean created = mergeRelation(graphId, ks.getId(), datasetEntityId, + "SOURCED_FROM", "{}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } + } catch (Exception e) { + log.warn("[{}] Failed to merge SOURCED_FROM for knowledge set: id={}", syncId, ks.getId(), e); + result.addError("sourced_from:" + ks.getId()); + } + } + return endResult(result); + } + // ----------------------------------------------------------------------- // 内部方法 // ----------------------------------------------------------------------- @@ -395,8 +873,11 @@ public class GraphSyncStepService { if (list.isEmpty()) { return List.of(); } - // Neo4j 要求列表元素类型一致,统一转为 String 列表 - return list.stream().map(Object::toString).toList(); + // Neo4j 要求列表元素类型一致,统一转为 String 列表;过滤 null 防止脏数据 + return list.stream() + .filter(Objects::nonNull) + .map(Object::toString) + .toList(); } return value; } @@ -406,6 +887,24 @@ public class GraphSyncStepService { * * @return true 如果是新创建的关系,false 如果已存在 */ + /** + * 将属性值(可能是 List 或单个 String)安全转换为 String 列表。 + */ + @SuppressWarnings("unchecked") + private static List toStringList(Object value) { + if (value instanceof List list) { + return list.stream() + .filter(Objects::nonNull) + .map(Object::toString) + .filter(s -> !s.isBlank()) + .toList(); + } + if (value instanceof String str && !str.isBlank()) { + return List.of(str); + } + return List.of(); + } + private boolean mergeRelation(String graphId, String sourceEntityId, String targetEntityId, String relationType, String propertiesJson, String syncId) { String newId = UUID.randomUUID().toString(); @@ -448,4 +947,13 @@ public class GraphSyncStepService { result.setCompletedAt(LocalDateTime.now()); return result; } + + /** + * 预加载指定类型的 sourceId → entityId 映射,消除关系构建中的 N+1 查询。 + */ + private Map buildSourceIdToEntityIdMap(String graphId, String type) { + return entityRepository.findByGraphIdAndType(graphId, type).stream() + .filter(e -> e.getSourceId() != null) + .collect(Collectors.toMap(GraphEntity::getSourceId, GraphEntity::getId, (a, b) -> a)); + } } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncResult.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncResult.java index 9d8dcd2..8ac5795 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncResult.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncResult.java @@ -40,6 +40,10 @@ public class SyncResult { @Builder.Default private int purged = 0; + /** 标记为占位符的步骤(功能尚未实现,结果无实际数据) */ + @Builder.Default + private boolean placeholder = false; + @Builder.Default private List errors = new ArrayList<>(); diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java index c80a122..9c5008d 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java @@ -28,6 +28,7 @@ public class DataManagementClient { private final RestTemplate restTemplate; private final String baseUrl; + private final String annotationBaseUrl; private final int pageSize; public DataManagementClient( @@ -35,6 +36,7 @@ public class DataManagementClient { KnowledgeGraphProperties properties) { this.restTemplate = restTemplate; this.baseUrl = properties.getSync().getDataManagementUrl(); + this.annotationBaseUrl = properties.getSync().getAnnotationServiceUrl(); this.pageSize = properties.getSync().getPageSize(); } @@ -42,40 +44,90 @@ public class DataManagementClient { * 拉取所有数据集(自动分页)。 */ public List listAllDatasets() { - List allDatasets = new ArrayList<>(); + return fetchAllPaged( + baseUrl + "/data-management/datasets", + new ParameterizedTypeReference>() {}, + "datasets"); + } + + /** + * 拉取所有工作流(自动分页)。 + */ + public List listAllWorkflows() { + return fetchAllPaged( + baseUrl + "/data-management/workflows", + new ParameterizedTypeReference>() {}, + "workflows"); + } + + /** + * 拉取所有作业(自动分页)。 + */ + public List listAllJobs() { + return fetchAllPaged( + baseUrl + "/data-management/jobs", + new ParameterizedTypeReference>() {}, + "jobs"); + } + + /** + * 拉取所有标注任务(自动分页,从标注服务)。 + */ + public List listAllLabelTasks() { + return fetchAllPaged( + annotationBaseUrl + "/annotation/label-tasks", + new ParameterizedTypeReference>() {}, + "label-tasks"); + } + + /** + * 拉取所有知识集(自动分页)。 + */ + public List listAllKnowledgeSets() { + return fetchAllPaged( + baseUrl + "/data-management/knowledge-sets", + new ParameterizedTypeReference>() {}, + "knowledge-sets"); + } + + /** + * 通用自动分页拉取方法。 + */ + private List fetchAllPaged(String baseEndpoint, + ParameterizedTypeReference> typeRef, + String resourceName) { + List allItems = new ArrayList<>(); int page = 0; while (true) { - String url = baseUrl + "/data-management/datasets?page=" + page + "&size=" + pageSize; - log.debug("Fetching datasets: page={}, size={}", page, pageSize); + String url = baseEndpoint + "?page=" + page + "&size=" + pageSize; + log.debug("Fetching {}: page={}, size={}", resourceName, page, pageSize); try { - ResponseEntity> response = restTemplate.exchange( - url, HttpMethod.GET, null, - new ParameterizedTypeReference<>() {} - ); + ResponseEntity> response = restTemplate.exchange( + url, HttpMethod.GET, null, typeRef); - PagedResult body = response.getBody(); + PagedResult body = response.getBody(); if (body == null || body.getContent() == null || body.getContent().isEmpty()) { break; } - allDatasets.addAll(body.getContent()); - log.debug("Fetched {} datasets (page {}), total so far: {}", - body.getContent().size(), page, allDatasets.size()); + allItems.addAll(body.getContent()); + log.debug("Fetched {} {} (page {}), total so far: {}", + body.getContent().size(), resourceName, page, allItems.size()); if (page >= body.getTotalPages() - 1) { break; } page++; } catch (RestClientException e) { - log.error("Failed to fetch datasets from data-management-service: page={}, url={}", page, url, e); + log.error("Failed to fetch {} : page={}, url={}", resourceName, page, url, e); throw e; } } - log.info("Fetched {} datasets in total from data-management-service", allDatasets.size()); - return allDatasets; + log.info("Fetched {} {} in total", allItems.size(), resourceName); + return allItems; } // ----------------------------------------------------------------------- @@ -123,4 +175,104 @@ public class DataManagementClient { private String color; private String description; } + + /** + * 与 data-management-service / data-cleaning-service 的 Workflow 对齐。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class WorkflowDTO { + private String id; + private String name; + private String description; + private String workflowType; + private String status; + private String version; + private Integer operatorCount; + private String schedule; + private String createdBy; + private String updatedBy; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; + /** 工作流使用的输入数据集 ID 列表 */ + private List inputDatasetIds; + } + + /** + * 与 data-management-service 的 Job / CleaningTask / DataSynthInstance 等对齐。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class JobDTO { + private String id; + private String name; + private String description; + private String jobType; + private String status; + private String startedAt; + private String completedAt; + private Long durationSeconds; + private Long inputCount; + private Long outputCount; + private String errorMessage; + private String createdBy; + private String updatedBy; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; + /** 输入数据集 ID */ + private String inputDatasetId; + /** 输出数据集 ID */ + private String outputDatasetId; + /** 所属工作流 ID(TRIGGERS 关系) */ + private String workflowId; + /** 依赖的作业 ID(DEPENDS_ON 关系) */ + private String dependsOnJobId; + } + + /** + * 与 data-annotation-service 的 LabelingProject / AutoAnnotationTask 对齐。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class LabelTaskDTO { + private String id; + private String name; + private String description; + private String taskMode; + private String dataType; + private String labelingType; + private String status; + private Double progress; + private String templateName; + private String createdBy; + private String updatedBy; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; + /** 标注使用的数据集 ID(USES_DATASET 关系) */ + private String datasetId; + } + + /** + * 与 data-management-service 的 KnowledgeSet 对齐。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class KnowledgeSetDTO { + private String id; + private String name; + private String description; + private String status; + private String domain; + private String businessLine; + private String sensitivity; + private Integer itemCount; + private String validFrom; + private String validTo; + private String createdBy; + private String updatedBy; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; + /** 来源数据集 ID 列表(SOURCED_FROM 关系) */ + private List sourceDatasetIds; + } } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java index 82d7cdc..650286b 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java @@ -31,6 +31,9 @@ public class KnowledgeGraphProperties { /** 数据管理服务基础 URL */ private String dataManagementUrl = "http://localhost:8080"; + /** 标注服务基础 URL */ + private String annotationServiceUrl = "http://localhost:8081"; + /** 同步每页拉取数量 */ private int pageSize = 200; diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncResultVO.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncResultVO.java index b5c951b..cfaa8f3 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncResultVO.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncResultVO.java @@ -29,6 +29,8 @@ public class SyncResultVO { private int purged; private int total; private long durationMillis; + /** 标记为占位符的步骤(功能尚未实现) */ + private boolean placeholder; /** 错误数量(不暴露具体错误信息) */ private int errorCount; private LocalDateTime startedAt; @@ -45,6 +47,7 @@ public class SyncResultVO { .purged(result.getPurged()) .total(result.total()) .durationMillis(result.durationMillis()) + .placeholder(result.isPlaceholder()) .errorCount(result.getErrors() != null ? result.getErrors().size() : 0) .startedAt(result.getStartedAt()) .completedAt(result.getCompletedAt()) diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java index bd9bb07..ff4dd31 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java @@ -104,4 +104,111 @@ public class GraphSyncController { @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { return SyncResultVO.from(syncService.buildBelongsToRelations(graphId)); } + + // ----------------------------------------------------------------------- + // 新增实体同步端点 + // ----------------------------------------------------------------------- + + /** + * 同步工作流实体。 + */ + @PostMapping("/workflows") + public SyncResultVO syncWorkflows( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncWorkflows(graphId)); + } + + /** + * 同步作业实体。 + */ + @PostMapping("/jobs") + public SyncResultVO syncJobs( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncJobs(graphId)); + } + + /** + * 同步标注任务实体。 + */ + @PostMapping("/label-tasks") + public SyncResultVO syncLabelTasks( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncLabelTasks(graphId)); + } + + /** + * 同步知识集实体。 + */ + @PostMapping("/knowledge-sets") + public SyncResultVO syncKnowledgeSets( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncKnowledgeSets(graphId)); + } + + // ----------------------------------------------------------------------- + // 新增关系构建端点 + // ----------------------------------------------------------------------- + + /** + * 构建 USES_DATASET 关系。 + */ + @PostMapping("/relations/uses-dataset") + public SyncResultVO buildUsesDatasetRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildUsesDatasetRelations(graphId)); + } + + /** + * 构建 PRODUCES 关系。 + */ + @PostMapping("/relations/produces") + public SyncResultVO buildProducesRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildProducesRelations(graphId)); + } + + /** + * 构建 ASSIGNED_TO 关系。 + */ + @PostMapping("/relations/assigned-to") + public SyncResultVO buildAssignedToRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildAssignedToRelations(graphId)); + } + + /** + * 构建 TRIGGERS 关系。 + */ + @PostMapping("/relations/triggers") + public SyncResultVO buildTriggersRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildTriggersRelations(graphId)); + } + + /** + * 构建 DEPENDS_ON 关系。 + */ + @PostMapping("/relations/depends-on") + public SyncResultVO buildDependsOnRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildDependsOnRelations(graphId)); + } + + /** + * 构建 IMPACTS 关系。 + */ + @PostMapping("/relations/impacts") + public SyncResultVO buildImpactsRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildImpactsRelations(graphId)); + } + + /** + * 构建 SOURCED_FROM 关系。 + */ + @PostMapping("/relations/sourced-from") + public SyncResultVO buildSourcedFromRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildSourcedFromRelations(graphId)); + } } diff --git a/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml b/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml index 3dd3b01..4a02bc6 100644 --- a/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml +++ b/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml @@ -27,6 +27,8 @@ datamate: sync: # 数据管理服务地址 data-management-url: ${DATA_MANAGEMENT_URL:http://localhost:8080} + # 标注服务地址 + annotation-service-url: ${ANNOTATION_SERVICE_URL:http://localhost:8081} # 每页拉取数量 page-size: ${KG_SYNC_PAGE_SIZE:200} # HTTP 连接超时(毫秒) diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java index 2ff794c..e3891e9 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java @@ -4,8 +4,13 @@ import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.knowledgegraph.domain.model.SyncResult; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.JobDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.LabelTaskDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.KnowledgeSetDTO; import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -13,6 +18,9 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -68,12 +76,36 @@ class GraphSyncServiceTest { .isInstanceOf(BusinessException.class); } + @Test + void syncWorkflows_invalidGraphId_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncWorkflows(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void syncJobs_invalidGraphId_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncJobs(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void syncLabelTasks_invalidGraphId_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncLabelTasks(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void syncKnowledgeSets_invalidGraphId_throwsBusinessException() { + assertThatThrownBy(() -> syncService.syncKnowledgeSets(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + // ----------------------------------------------------------------------- - // syncAll — 正常流程 + // syncAll — 正常流程(8 实体 + 10 关系 = 18 结果) // ----------------------------------------------------------------------- @Test - void syncAll_success_returnsResultList() { + void syncAll_success_returnsAllResults() { when(properties.getSync()).thenReturn(syncConfig); DatasetDTO dto = new DatasetDTO(); @@ -81,40 +113,75 @@ class GraphSyncServiceTest { dto.setName("Test"); dto.setCreatedBy("admin"); when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto)); + when(dataManagementClient.listAllWorkflows()).thenReturn(List.of()); + when(dataManagementClient.listAllJobs()).thenReturn(List.of()); + when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of()); + when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of()); - SyncResult entityResult = SyncResult.builder().syncType("Dataset").build(); - SyncResult fieldResult = SyncResult.builder().syncType("Field").build(); - SyncResult userResult = SyncResult.builder().syncType("User").build(); - SyncResult orgResult = SyncResult.builder().syncType("Org").build(); - SyncResult hasFieldResult = SyncResult.builder().syncType("HAS_FIELD").build(); - SyncResult derivedFromResult = SyncResult.builder().syncType("DERIVED_FROM").build(); - SyncResult belongsToResult = SyncResult.builder().syncType("BELONGS_TO").build(); - + // 8 entity upsert results when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString())) - .thenReturn(entityResult); + .thenReturn(SyncResult.builder().syncType("Dataset").build()); when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString())) - .thenReturn(fieldResult); + .thenReturn(SyncResult.builder().syncType("Field").build()); when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) - .thenReturn(userResult); + .thenReturn(SyncResult.builder().syncType("User").build()); when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) - .thenReturn(orgResult); + .thenReturn(SyncResult.builder().syncType("Org").build()); + when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Workflow").build()); + when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Job").build()); + when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("LabelTask").build()); + when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("KnowledgeSet").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString())) .thenReturn(0); + + // 10 relation merge results when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString())) - .thenReturn(hasFieldResult); + .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) - .thenReturn(derivedFromResult); + .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) - .thenReturn(belongsToResult); + .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); + when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); + when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("PRODUCES").build()); + when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build()); + when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("TRIGGERS").build()); + when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build()); + when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("IMPACTS").build()); + when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); List results = syncService.syncAll(GRAPH_ID); - assertThat(results).hasSize(7); - assertThat(results.get(0).getSyncType()).isEqualTo("Dataset"); + // 8 entities + 10 relations = 18 + assertThat(results).hasSize(18); + + // 按 syncType 建立索引,避免依赖固定下标 + Map byType = results.stream() + .collect(Collectors.toMap(SyncResult::getSyncType, Function.identity())); + + // 验证所有 8 个实体类型都存在 + assertThat(byType).containsKeys("Dataset", "Field", "User", "Org", + "Workflow", "Job", "LabelTask", "KnowledgeSet"); + + // 验证所有 10 个关系类型都存在 + assertThat(byType).containsKeys("HAS_FIELD", "DERIVED_FROM", "BELONGS_TO", + "USES_DATASET", "PRODUCES", "ASSIGNED_TO", "TRIGGERS", + "DEPENDS_ON", "IMPACTS", "SOURCED_FROM"); } // ----------------------------------------------------------------------- - // syncAll — 正常流程 + // 重试耗尽 // ----------------------------------------------------------------------- @Test @@ -124,6 +191,148 @@ class GraphSyncServiceTest { assertThatThrownBy(() -> syncService.syncDatasets(GRAPH_ID)) .isInstanceOf(BusinessException.class) - .hasMessageContaining("拉取数据集失败"); + .hasMessageContaining("datasets"); + } + + // ----------------------------------------------------------------------- + // 新增实体单步同步 + // ----------------------------------------------------------------------- + + @Nested + class NewEntitySyncTest { + + @Test + void syncWorkflows_success() { + when(properties.getSync()).thenReturn(syncConfig); + + WorkflowDTO wf = new WorkflowDTO(); + wf.setId("wf-001"); + wf.setName("Clean Pipeline"); + when(dataManagementClient.listAllWorkflows()).thenReturn(List.of(wf)); + when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Workflow").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Workflow"), anySet(), anyString())) + .thenReturn(0); + + SyncResult result = syncService.syncWorkflows(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("Workflow"); + verify(stepService).upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()); + } + + @Test + void syncJobs_success() { + when(properties.getSync()).thenReturn(syncConfig); + + JobDTO job = new JobDTO(); + job.setId("job-001"); + job.setName("Clean Job"); + when(dataManagementClient.listAllJobs()).thenReturn(List.of(job)); + when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Job").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Job"), anySet(), anyString())) + .thenReturn(0); + + SyncResult result = syncService.syncJobs(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("Job"); + verify(stepService).upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()); + } + + @Test + void syncLabelTasks_success() { + when(properties.getSync()).thenReturn(syncConfig); + + LabelTaskDTO task = new LabelTaskDTO(); + task.setId("lt-001"); + task.setName("Label Task"); + when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of(task)); + when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("LabelTask").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("LabelTask"), anySet(), anyString())) + .thenReturn(0); + + SyncResult result = syncService.syncLabelTasks(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("LabelTask"); + } + + @Test + void syncKnowledgeSets_success() { + when(properties.getSync()).thenReturn(syncConfig); + + KnowledgeSetDTO ks = new KnowledgeSetDTO(); + ks.setId("ks-001"); + ks.setName("Knowledge Set"); + when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of(ks)); + when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("KnowledgeSet").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("KnowledgeSet"), anySet(), anyString())) + .thenReturn(0); + + SyncResult result = syncService.syncKnowledgeSets(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("KnowledgeSet"); + } + + @Test + void syncWorkflows_fetchFailed_throwsBusinessException() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.listAllWorkflows()).thenThrow(new RuntimeException("timeout")); + + assertThatThrownBy(() -> syncService.syncWorkflows(GRAPH_ID)) + .isInstanceOf(BusinessException.class) + .hasMessageContaining("workflows"); + } + } + + // ----------------------------------------------------------------------- + // 新增关系构建 + // ----------------------------------------------------------------------- + + @Nested + class NewRelationBuildTest { + + @Test + void buildUsesDatasetRelations_invalidGraphId_throws() { + assertThatThrownBy(() -> syncService.buildUsesDatasetRelations(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void buildProducesRelations_invalidGraphId_throws() { + assertThatThrownBy(() -> syncService.buildProducesRelations(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void buildAssignedToRelations_invalidGraphId_throws() { + assertThatThrownBy(() -> syncService.buildAssignedToRelations(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void buildTriggersRelations_invalidGraphId_throws() { + assertThatThrownBy(() -> syncService.buildTriggersRelations(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void buildDependsOnRelations_invalidGraphId_throws() { + assertThatThrownBy(() -> syncService.buildDependsOnRelations(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void buildImpactsRelations_invalidGraphId_throws() { + assertThatThrownBy(() -> syncService.buildImpactsRelations(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } + + @Test + void buildSourcedFromRelations_invalidGraphId_throws() { + assertThatThrownBy(() -> syncService.buildSourcedFromRelations(INVALID_GRAPH_ID)) + .isInstanceOf(BusinessException.class); + } } } diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java index 7ea4bc1..efb843a 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java @@ -5,6 +5,10 @@ import com.datamate.knowledgegraph.domain.model.SyncResult; import com.datamate.knowledgegraph.domain.repository.GraphEntityRepository; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.TagDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.JobDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.LabelTaskDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.KnowledgeSetDTO; import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -22,6 +26,8 @@ import org.springframework.data.neo4j.core.Neo4jClient.RecordFetchSpec; import org.springframework.data.neo4j.core.Neo4jClient.MappingSpec; import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.*; @@ -56,9 +62,6 @@ class GraphSyncStepServiceTest { // Neo4jClient mock chain helper // ----------------------------------------------------------------------- - /** - * Build a full mock chain for neo4jClient.query(...).bindAll(...).fetchAs(Long).mappedBy(...).one() - */ @SuppressWarnings("unchecked") private void setupNeo4jQueryChain(Class fetchType, Object returnValue) { UnboundRunnableSpec unboundSpec = mock(UnboundRunnableSpec.class); @@ -90,7 +93,6 @@ class GraphSyncStepServiceTest { GRAPH_ID, "Dataset", Collections.emptySet(), SYNC_ID); assertThat(deleted).isEqualTo(0); - // Should NOT execute any Cypher query verifyNoInteractions(neo4jClient); } @@ -133,7 +135,7 @@ class GraphSyncStepServiceTest { } // ----------------------------------------------------------------------- - // upsertDatasetEntities — P2-5 单条 Cypher 优化 + // upsertDatasetEntities // ----------------------------------------------------------------------- @Nested @@ -200,12 +202,10 @@ class GraphSyncStepServiceTest { stepService.upsertDatasetEntities(GRAPH_ID, List.of(dto), SYNC_ID); - // Verify the MERGE Cypher includes property SET clauses (N+1 fix) verify(neo4jClient).query(cypherCaptor.capture()); String cypher = cypherCaptor.getValue(); assertThat(cypher).contains("MERGE"); assertThat(cypher).contains("properties."); - // Verify NO separate find+save (N+1 eliminated) verifyNoInteractions(entityRepository); } @@ -270,7 +270,205 @@ class GraphSyncStepServiceTest { } // ----------------------------------------------------------------------- - // 关系构建 + // upsertWorkflowEntities + // ----------------------------------------------------------------------- + + @Nested + class UpsertWorkflowEntitiesTest { + + @Test + void upsert_newWorkflow_incrementsCreated() { + when(properties.getImportBatchSize()).thenReturn(100); + setupNeo4jQueryChain(Boolean.class, true); + + WorkflowDTO dto = new WorkflowDTO(); + dto.setId("wf-001"); + dto.setName("Clean Pipeline"); + dto.setWorkflowType("CLEANING"); + dto.setStatus("ACTIVE"); + dto.setVersion("2.1"); + dto.setOperatorCount(3); + + SyncResult result = stepService.upsertWorkflowEntities( + GRAPH_ID, List.of(dto), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(1); + assertThat(result.getSyncType()).isEqualTo("Workflow"); + } + + @Test + void upsert_emptyList_returnsZero() { + when(properties.getImportBatchSize()).thenReturn(100); + + SyncResult result = stepService.upsertWorkflowEntities( + GRAPH_ID, List.of(), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(0); + verifyNoInteractions(neo4jClient); + } + + @Test + void upsert_withInputDatasetIds_storesProperty() { + when(properties.getImportBatchSize()).thenReturn(100); + setupNeo4jQueryChain(Boolean.class, true); + + WorkflowDTO dto = new WorkflowDTO(); + dto.setId("wf-001"); + dto.setName("Pipeline"); + dto.setWorkflowType("CLEANING"); + dto.setInputDatasetIds(List.of("ds-001", "ds-002")); + + stepService.upsertWorkflowEntities(GRAPH_ID, List.of(dto), SYNC_ID); + + verify(neo4jClient).query(cypherCaptor.capture()); + assertThat(cypherCaptor.getValue()).contains("properties.input_dataset_ids"); + } + } + + // ----------------------------------------------------------------------- + // upsertJobEntities + // ----------------------------------------------------------------------- + + @Nested + class UpsertJobEntitiesTest { + + @Test + void upsert_newJob_incrementsCreated() { + when(properties.getImportBatchSize()).thenReturn(100); + setupNeo4jQueryChain(Boolean.class, true); + + JobDTO dto = new JobDTO(); + dto.setId("job-001"); + dto.setName("Clean Job"); + dto.setJobType("CLEANING"); + dto.setStatus("COMPLETED"); + dto.setDurationSeconds(2100L); + dto.setInputDatasetId("ds-001"); + dto.setOutputDatasetId("ds-002"); + dto.setWorkflowId("wf-001"); + dto.setCreatedBy("admin"); + + SyncResult result = stepService.upsertJobEntities( + GRAPH_ID, List.of(dto), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(1); + assertThat(result.getSyncType()).isEqualTo("Job"); + } + + @Test + void upsert_jobWithDependency_storesProperty() { + when(properties.getImportBatchSize()).thenReturn(100); + setupNeo4jQueryChain(Boolean.class, true); + + JobDTO dto = new JobDTO(); + dto.setId("job-002"); + dto.setName("Downstream Job"); + dto.setJobType("SYNTHESIS"); + dto.setStatus("PENDING"); + dto.setDependsOnJobId("job-001"); + + stepService.upsertJobEntities(GRAPH_ID, List.of(dto), SYNC_ID); + + verify(neo4jClient).query(cypherCaptor.capture()); + assertThat(cypherCaptor.getValue()).contains("properties.depends_on_job_id"); + } + + @Test + void upsert_emptyList_returnsZero() { + when(properties.getImportBatchSize()).thenReturn(100); + + SyncResult result = stepService.upsertJobEntities( + GRAPH_ID, List.of(), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(0); + verifyNoInteractions(neo4jClient); + } + } + + // ----------------------------------------------------------------------- + // upsertLabelTaskEntities + // ----------------------------------------------------------------------- + + @Nested + class UpsertLabelTaskEntitiesTest { + + @Test + void upsert_newLabelTask_incrementsCreated() { + when(properties.getImportBatchSize()).thenReturn(100); + setupNeo4jQueryChain(Boolean.class, true); + + LabelTaskDTO dto = new LabelTaskDTO(); + dto.setId("lt-001"); + dto.setName("Label Task"); + dto.setTaskMode("MANUAL"); + dto.setStatus("IN_PROGRESS"); + dto.setDataType("image"); + dto.setLabelingType("object_detection"); + dto.setProgress(45.5); + dto.setDatasetId("ds-001"); + dto.setCreatedBy("admin"); + + SyncResult result = stepService.upsertLabelTaskEntities( + GRAPH_ID, List.of(dto), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(1); + assertThat(result.getSyncType()).isEqualTo("LabelTask"); + } + + @Test + void upsert_emptyList_returnsZero() { + when(properties.getImportBatchSize()).thenReturn(100); + + SyncResult result = stepService.upsertLabelTaskEntities( + GRAPH_ID, List.of(), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(0); + verifyNoInteractions(neo4jClient); + } + } + + // ----------------------------------------------------------------------- + // upsertKnowledgeSetEntities + // ----------------------------------------------------------------------- + + @Nested + class UpsertKnowledgeSetEntitiesTest { + + @Test + void upsert_newKnowledgeSet_incrementsCreated() { + when(properties.getImportBatchSize()).thenReturn(100); + setupNeo4jQueryChain(Boolean.class, true); + + KnowledgeSetDTO dto = new KnowledgeSetDTO(); + dto.setId("ks-001"); + dto.setName("Medical Knowledge"); + dto.setStatus("PUBLISHED"); + dto.setDomain("medical"); + dto.setSensitivity("INTERNAL"); + dto.setItemCount(320); + dto.setSourceDatasetIds(List.of("ds-001", "ds-002")); + + SyncResult result = stepService.upsertKnowledgeSetEntities( + GRAPH_ID, List.of(dto), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(1); + assertThat(result.getSyncType()).isEqualTo("KnowledgeSet"); + } + + @Test + void upsert_emptyList_returnsZero() { + when(properties.getImportBatchSize()).thenReturn(100); + + SyncResult result = stepService.upsertKnowledgeSetEntities( + GRAPH_ID, List.of(), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(0); + verifyNoInteractions(neo4jClient); + } + } + + // ----------------------------------------------------------------------- + // 关系构建 - 已有 // ----------------------------------------------------------------------- @Nested @@ -280,6 +478,8 @@ class GraphSyncStepServiceTest { void mergeHasField_noFields_returnsEmptyResult() { when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")) .thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) + .thenReturn(List.of()); SyncResult result = stepService.mergeHasFieldRelations(GRAPH_ID, SYNC_ID); @@ -293,7 +493,7 @@ class GraphSyncStepServiceTest { .id("entity-1") .type("Dataset") .graphId(GRAPH_ID) - .properties(new HashMap<>()) // no parent_dataset_id + .properties(new HashMap<>()) .build(); when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) @@ -315,4 +515,307 @@ class GraphSyncStepServiceTest { assertThat(result.getErrors()).contains("belongs_to:org_missing"); } } + + // ----------------------------------------------------------------------- + // 新增关系构建 + // ----------------------------------------------------------------------- + + @Nested + class NewMergeRelationsTest { + + @Test + void mergeUsesDataset_noJobs_noTasks_noWorkflows_returnsZero() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of()); + + SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("USES_DATASET"); + assertThat(result.getCreated()).isEqualTo(0); + } + + @Test + void mergeUsesDataset_jobWithInputDataset_createsRelation() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity job = GraphEntity.builder() + .id("job-entity-1").type("Job").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("input_dataset_id", "ds-001"))) + .build(); + GraphEntity dataset = GraphEntity.builder() + .id("ds-entity-1").sourceId("ds-001").type("Dataset").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset)); + + SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("USES_DATASET"); + verify(neo4jClient).query(cypherCaptor.capture()); + String cypher = cypherCaptor.getValue(); + // 验证关系类型和方向:source → target + assertThat(cypher).contains("RELATED_TO"); + assertThat(cypher).contains("relation_type: $relationType"); + } + + @Test + void mergeUsesDataset_workflowWithSingleStringInput_handledCorrectly() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + // 单值 String 而非 List,验证 toStringList 统一处理 + GraphEntity workflow = GraphEntity.builder() + .id("wf-entity-1").type("Workflow").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("input_dataset_ids", "ds-single"))) + .build(); + GraphEntity dataset = GraphEntity.builder() + .id("ds-entity-s").sourceId("ds-single").type("Dataset").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of(workflow)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset)); + + SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID); + + verify(neo4jClient, atLeastOnce()).query(anyString()); + } + + @Test + void mergeUsesDataset_listWithNullElements_filtersNulls() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + List listWithNulls = new ArrayList<>(); + listWithNulls.add("ds-good"); + listWithNulls.add(null); + listWithNulls.add(""); + listWithNulls.add(" "); + + GraphEntity workflow = GraphEntity.builder() + .id("wf-entity-2").type("Workflow").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("input_dataset_ids", listWithNulls))) + .build(); + GraphEntity dataset = GraphEntity.builder() + .id("ds-entity-g").sourceId("ds-good").type("Dataset").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of(workflow)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset)); + + SyncResult result = stepService.mergeUsesDatasetRelations(GRAPH_ID, SYNC_ID); + + // 只有 "ds-good" 应被处理(null、空、空白已过滤),验证只发起一次 mergeRelation + verify(neo4jClient, times(1)).query(anyString()); + } + + @Test + void mergeProduces_noJobs_returnsZero() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of()); + + SyncResult result = stepService.mergeProducesRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("PRODUCES"); + assertThat(result.getCreated()).isEqualTo(0); + } + + @Test + void mergeProduces_jobWithoutOutput_skips() { + GraphEntity job = GraphEntity.builder() + .id("job-entity-1").type("Job").graphId(GRAPH_ID) + .properties(new HashMap<>()) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of()); + + SyncResult result = stepService.mergeProducesRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(0); + } + + @Test + void mergeAssignedTo_noTasksOrJobs_returnsZero() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User")).thenReturn(List.of()); + + SyncResult result = stepService.mergeAssignedToRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("ASSIGNED_TO"); + assertThat(result.getCreated()).isEqualTo(0); + } + + @Test + void mergeAssignedTo_taskWithCreatedBy_verifiesUserLookup() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity task = GraphEntity.builder() + .id("lt-entity-1").type("LabelTask").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("created_by", "admin"))) + .build(); + GraphEntity user = GraphEntity.builder() + .id("user-entity-1").sourceId("user:admin").type("User").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "LabelTask")).thenReturn(List.of(task)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User")).thenReturn(List.of(user)); + + SyncResult result = stepService.mergeAssignedToRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("ASSIGNED_TO"); + // 验证通过预加载的 userMap 查找 User 实体(不再有 N+1 查询) + verify(neo4jClient).query(cypherCaptor.capture()); + assertThat(cypherCaptor.getValue()).contains("RELATED_TO"); + } + + @Test + void mergeTriggers_noJobs_returnsZero() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of()); + + SyncResult result = stepService.mergeTriggersRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("TRIGGERS"); + assertThat(result.getCreated()).isEqualTo(0); + } + + @Test + void mergeTriggers_jobWithWorkflowId_createsRelationWithCorrectDirection() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity job = GraphEntity.builder() + .id("job-entity-1").type("Job").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("workflow_id", "wf-001"))) + .build(); + GraphEntity workflow = GraphEntity.builder() + .id("wf-entity-1").sourceId("wf-001").type("Workflow").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Workflow")).thenReturn(List.of(workflow)); + + SyncResult result = stepService.mergeTriggersRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("TRIGGERS"); + verify(neo4jClient).query(cypherCaptor.capture()); + String cypher = cypherCaptor.getValue(); + // 验证 MERGE 使用 RELATED_TO 关系类型 + assertThat(cypher).contains("RELATED_TO"); + // 验证 Cypher 参数绑定:source 应为 Workflow,target 应为 Job + assertThat(cypher).contains("$sourceEntityId"); + assertThat(cypher).contains("$targetEntityId"); + } + + @Test + void mergeDependsOn_noJobs_returnsZero() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of()); + + SyncResult result = stepService.mergeDependsOnRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("DEPENDS_ON"); + assertThat(result.getCreated()).isEqualTo(0); + } + + @Test + void mergeDependsOn_jobWithDependency_verifiesCypherParams() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity job = GraphEntity.builder() + .id("job-entity-2").sourceId("job-002").type("Job").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("depends_on_job_id", "job-001"))) + .build(); + GraphEntity depJob = GraphEntity.builder() + .id("job-entity-1").sourceId("job-001").type("Job").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job")).thenReturn(List.of(job, depJob)); + + SyncResult result = stepService.mergeDependsOnRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("DEPENDS_ON"); + verify(neo4jClient).query(cypherCaptor.capture()); + String cypher = cypherCaptor.getValue(); + assertThat(cypher).contains("RELATED_TO"); + assertThat(cypher).contains("$relationType"); + } + + @Test + void mergeImpacts_returnsPlaceholderResult() { + SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("IMPACTS"); + assertThat(result.getCreated()).isEqualTo(0); + assertThat(result.isPlaceholder()).isTrue(); + verifyNoInteractions(neo4jClient); + verifyNoInteractions(entityRepository); + } + + @Test + void mergeSourcedFrom_noKnowledgeSets_returnsZero() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "KnowledgeSet")).thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of()); + + SyncResult result = stepService.mergeSourcedFromRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("SOURCED_FROM"); + assertThat(result.getCreated()).isEqualTo(0); + } + + @Test + void mergeSourcedFrom_ksWithSources_verifiesCypherAndLookup() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity ks = GraphEntity.builder() + .id("ks-entity-1").type("KnowledgeSet").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("source_dataset_ids", List.of("ds-001")))) + .build(); + GraphEntity dataset = GraphEntity.builder() + .id("ds-entity-1").sourceId("ds-001").type("Dataset").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "KnowledgeSet")).thenReturn(List.of(ks)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset)); + + SyncResult result = stepService.mergeSourcedFromRelations(GRAPH_ID, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("SOURCED_FROM"); + verify(neo4jClient).query(cypherCaptor.capture()); + assertThat(cypherCaptor.getValue()).contains("RELATED_TO"); + } + + @Test + void mergeSourcedFrom_listWithNullElements_filtersNulls() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + List listWithNulls = new ArrayList<>(); + listWithNulls.add("ds-valid"); + listWithNulls.add(null); + listWithNulls.add(""); + + GraphEntity ks = GraphEntity.builder() + .id("ks-entity-2").type("KnowledgeSet").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("source_dataset_ids", listWithNulls))) + .build(); + GraphEntity dataset = GraphEntity.builder() + .id("ds-entity-v").sourceId("ds-valid").type("Dataset").graphId(GRAPH_ID) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "KnowledgeSet")).thenReturn(List.of(ks)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")).thenReturn(List.of(dataset)); + + SyncResult result = stepService.mergeSourcedFromRelations(GRAPH_ID, SYNC_ID); + + // 只有 "ds-valid" 应被处理,null 和空字符串已过滤;验证只发起一次 mergeRelation + verify(neo4jClient, times(1)).query(anyString()); + } + } }