fix(kg): 根据 Codex 审查反馈修复知识图谱同步问题

修复内容:
1. [P1] 修复 job_id 错误清洗问题
   - 新增 sanitizePropertyValue() 方法对属性值进行安全处理
   - 修复 IMPACTS 关系中 job_id JSON 注入风险

2. [P2] 修复增量同步关系全量重算问题
   - 为所有关系构建方法添加 changedEntityIds 参数支持
   - 增量同步时仅处理变更实体相关的关系,提升性能

3. [P2] 修复 MERGE ON MATCH 覆盖属性问题
   - 实体 upsert 时保留原有非空 name/description 值
   - 关系 MERGE 时保留原有非空 properties_json 值
   - GraphRelationRepository 中优化条件覆盖逻辑

4. 修复测试 Mock stub 签名不匹配问题
   - 同时支持 2 参数和 3 参数版本的关系方法
   - 使用 lenient() 模式避免 unnecessary stubbing 错误

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 09:56:16 +08:00
parent 42069f82b3
commit f12e4abd83
7 changed files with 754 additions and 31 deletions

View File

@@ -200,6 +200,108 @@ public class GraphSyncService {
} }
} }
// -----------------------------------------------------------------------
// 增量同步
// -----------------------------------------------------------------------
/**
* 增量同步:仅拉取指定时间窗口内变更的数据并同步到 Neo4j。
* <p>
* 与全量同步的区别:
* <ul>
* <li>通过 updatedFrom/updatedTo 过滤变更数据</li>
* <li>不执行 purge(不删除旧实体)</li>
* <li>在 SyncMetadata 中记录时间窗口</li>
* </ul>
*/
public SyncMetadata syncIncremental(String graphId, LocalDateTime updatedFrom, LocalDateTime updatedTo) {
validateGraphId(graphId);
if (updatedFrom == null || updatedTo == null) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "增量同步必须指定 updatedFrom 和 updatedTo");
}
if (updatedFrom.isAfter(updatedTo)) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "updatedFrom 不能晚于 updatedTo");
}
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
log.info("[{}] Starting incremental sync for graphId={}, window=[{}, {}]",
syncId, graphId, updatedFrom, updatedTo);
// 拉取时间窗口内变更的数据
List<DatasetDTO> datasets = fetchWithRetry(syncId, "datasets",
() -> dataManagementClient.listAllDatasets(updatedFrom, updatedTo));
List<WorkflowDTO> workflows = fetchWithRetry(syncId, "workflows",
() -> dataManagementClient.listAllWorkflows(updatedFrom, updatedTo));
List<JobDTO> jobs = fetchWithRetry(syncId, "jobs",
() -> dataManagementClient.listAllJobs(updatedFrom, updatedTo));
List<LabelTaskDTO> labelTasks = fetchWithRetry(syncId, "label-tasks",
() -> dataManagementClient.listAllLabelTasks(updatedFrom, updatedTo));
List<KnowledgeSetDTO> knowledgeSets = fetchWithRetry(syncId, "knowledge-sets",
() -> dataManagementClient.listAllKnowledgeSets(updatedFrom, updatedTo));
Map<String, SyncResult> resultMap = new LinkedHashMap<>();
// 实体同步(仅 upsert,不 purge)
resultMap.put("Dataset", stepService.upsertDatasetEntities(graphId, datasets, syncId));
resultMap.put("Field", stepService.upsertFieldEntities(graphId, datasets, syncId));
Set<String> usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets);
resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId));
resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId));
resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId));
resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId));
resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId));
resultMap.put("KnowledgeSet", stepService.upsertKnowledgeSetEntities(graphId, knowledgeSets, syncId));
// 收集所有变更(创建或更新)的实体ID,用于增量关系构建
Set<String> changedEntityIds = collectChangedEntityIds(datasets, workflows, jobs, labelTasks, knowledgeSets, graphId);
// 关系构建(MERGE 幂等)- 增量同步时只处理变更实体相关的关系
resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId, changedEntityIds));
resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId, changedEntityIds));
resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId, changedEntityIds));
resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId, changedEntityIds));
resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId, changedEntityIds));
resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId, changedEntityIds));
resultMap.put("TRIGGERS", stepService.mergeTriggersRelations(graphId, syncId, changedEntityIds));
resultMap.put("DEPENDS_ON", stepService.mergeDependsOnRelations(graphId, syncId, changedEntityIds));
resultMap.put("IMPACTS", stepService.mergeImpactsRelations(graphId, syncId, changedEntityIds));
resultMap.put("SOURCED_FROM", stepService.mergeSourcedFromRelations(graphId, syncId, changedEntityIds));
List<SyncResult> results = new ArrayList<>(resultMap.values());
log.info("[{}] Incremental sync completed for graphId={}. Summary: {}", syncId, graphId,
results.stream()
.map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")")
.collect(Collectors.joining(", ")));
SyncMetadata metadata = SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, results);
metadata.setUpdatedFrom(updatedFrom);
metadata.setUpdatedTo(updatedTo);
saveSyncHistory(metadata);
return metadata;
} catch (BusinessException e) {
SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage());
failed.setUpdatedFrom(updatedFrom);
failed.setUpdatedTo(updatedTo);
saveSyncHistory(failed);
throw e;
} catch (Exception e) {
SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage());
failed.setUpdatedFrom(updatedFrom);
failed.setUpdatedTo(updatedTo);
saveSyncHistory(failed);
log.error("[{}] Incremental sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "增量同步失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// 单步同步(各自获取锁和数据) // 单步同步(各自获取锁和数据)
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@@ -771,6 +873,74 @@ public class GraphSyncService {
} }
} }
/**
* 收集增量同步中变更(创建或更新)的实体ID。
* 通过查询数据库获取这些sourceId对应的entityId。
*/
private Set<String> collectChangedEntityIds(List<DatasetDTO> datasets,
List<WorkflowDTO> workflows,
List<JobDTO> jobs,
List<LabelTaskDTO> labelTasks,
List<KnowledgeSetDTO> knowledgeSets,
String graphId) {
Set<String> entityIds = new HashSet<>();
// 通过数据管理客户端获取到的sourceId,需要转换为对应的entityId
// 这里使用简化的方法:查询所有相关类型的实体并根据sourceId匹配
try {
// 收集所有变更的sourceId
Set<String> changedSourceIds = new HashSet<>();
datasets.stream().filter(Objects::nonNull).map(DatasetDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
workflows.stream().filter(Objects::nonNull).map(WorkflowDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
jobs.stream().filter(Objects::nonNull).map(JobDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
labelTasks.stream().filter(Objects::nonNull).map(LabelTaskDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
knowledgeSets.stream().filter(Objects::nonNull).map(KnowledgeSetDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
// 添加字段的sourceId
for (DatasetDTO dataset : datasets) {
if (dataset != null && dataset.getTags() != null) {
for (DataManagementClient.TagDTO tag : dataset.getTags()) {
if (tag != null && tag.getName() != null) {
changedSourceIds.add(dataset.getId() + ":tag:" + tag.getName());
}
}
}
}
// 查询这些sourceId对应的entityId
if (!changedSourceIds.isEmpty()) {
for (String sourceId : changedSourceIds) {
// 简化处理:这里可以优化为批量查询
String cypher = "MATCH (e:Entity {graph_id: $graphId, source_id: $sourceId}) RETURN e.id AS entityId";
List<String> foundEntityIds = stepService.neo4jClient.query(cypher)
.bindAll(Map.of("graphId", graphId, "sourceId", sourceId))
.fetchAs(String.class)
.mappedBy((ts, record) -> record.get("entityId").asString())
.all()
.stream().toList();
entityIds.addAll(foundEntityIds);
}
}
} catch (Exception e) {
log.warn("Failed to collect changed entity IDs, falling back to full relation rebuild: {}", e.getMessage());
// 如果收集失败,返回null表示进行全量关系构建
return null;
}
log.debug("Collected {} changed entity IDs for incremental relation building", entityIds.size());
return entityIds;
}
private void validateGraphId(String graphId) { private void validateGraphId(String graphId) {
if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) { if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效"); throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效");

View File

@@ -39,7 +39,7 @@ public class GraphSyncStepService {
private static final String REL_TYPE = "RELATED_TO"; private static final String REL_TYPE = "RELATED_TO";
private final GraphEntityRepository entityRepository; private final GraphEntityRepository entityRepository;
private final Neo4jClient neo4jClient; final Neo4jClient neo4jClient; // 改为包级别访问,供GraphSyncService使用
private final KnowledgeGraphProperties properties; private final KnowledgeGraphProperties properties;
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@@ -441,11 +441,35 @@ public class GraphSyncStepService {
@Transactional @Transactional
public SyncResult mergeHasFieldRelations(String graphId, String syncId) { public SyncResult mergeHasFieldRelations(String graphId, String syncId) {
return mergeHasFieldRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeHasFieldRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("HAS_FIELD", syncId); SyncResult result = beginResult("HAS_FIELD", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
List<GraphEntity> fields = entityRepository.findByGraphIdAndType(graphId, "Field"); List<GraphEntity> fields = entityRepository.findByGraphIdAndType(graphId, "Field");
// 增量同步时只处理变更相关的字段
if (changedEntityIds != null) {
fields = fields.stream()
.filter(field -> {
// 包含自身变更的字段
if (changedEntityIds.contains(field.getId())) {
return true;
}
// 包含关联数据集发生变更的字段
Object datasetSourceId = field.getProperties().get("dataset_source_id");
if (datasetSourceId != null) {
String datasetEntityId = datasetMap.get(datasetSourceId.toString());
return datasetEntityId != null && changedEntityIds.contains(datasetEntityId);
}
return false;
})
.toList();
}
for (GraphEntity field : fields) { for (GraphEntity field : fields) {
try { try {
Object datasetSourceId = field.getProperties().get("dataset_source_id"); Object datasetSourceId = field.getProperties().get("dataset_source_id");
@@ -477,11 +501,23 @@ public class GraphSyncStepService {
@Transactional @Transactional
public SyncResult mergeDerivedFromRelations(String graphId, String syncId) { public SyncResult mergeDerivedFromRelations(String graphId, String syncId) {
return mergeDerivedFromRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeDerivedFromRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("DERIVED_FROM", syncId); SyncResult result = beginResult("DERIVED_FROM", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
// 增量同步时只处理变更的数据集
if (changedEntityIds != null) {
datasets = datasets.stream()
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
.toList();
}
for (GraphEntity dataset : datasets) { for (GraphEntity dataset : datasets) {
try { try {
Object parentId = dataset.getProperties().get("parent_dataset_id"); Object parentId = dataset.getProperties().get("parent_dataset_id");
@@ -512,6 +548,11 @@ public class GraphSyncStepService {
@Transactional @Transactional
public SyncResult mergeBelongsToRelations(String graphId, String syncId) { public SyncResult mergeBelongsToRelations(String graphId, String syncId) {
return mergeBelongsToRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeBelongsToRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("BELONGS_TO", syncId); SyncResult result = beginResult("BELONGS_TO", syncId);
Optional<GraphEntity> defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType( Optional<GraphEntity> defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType(
@@ -524,7 +565,13 @@ public class GraphSyncStepService {
String orgId = defaultOrgOpt.get().getId(); String orgId = defaultOrgOpt.get().getId();
// User → Org // User → Org
for (GraphEntity user : entityRepository.findByGraphIdAndType(graphId, "User")) { List<GraphEntity> users = entityRepository.findByGraphIdAndType(graphId, "User");
if (changedEntityIds != null) {
users = users.stream()
.filter(user -> changedEntityIds.contains(user.getId()))
.toList();
}
for (GraphEntity user : users) {
try { try {
boolean created = mergeRelation(graphId, user.getId(), orgId, boolean created = mergeRelation(graphId, user.getId(), orgId,
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
@@ -536,7 +583,13 @@ public class GraphSyncStepService {
} }
// Dataset → Org // Dataset → Org
for (GraphEntity dataset : entityRepository.findByGraphIdAndType(graphId, "Dataset")) { List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
if (changedEntityIds != null) {
datasets = datasets.stream()
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
.toList();
}
for (GraphEntity dataset : datasets) {
try { try {
boolean created = mergeRelation(graphId, dataset.getId(), orgId, boolean created = mergeRelation(graphId, dataset.getId(), orgId,
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
@@ -559,22 +612,45 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeUsesDatasetRelations(String graphId, String syncId) { public SyncResult mergeUsesDatasetRelations(String graphId, String syncId) {
return mergeUsesDatasetRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeUsesDatasetRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("USES_DATASET", syncId); SyncResult result = beginResult("USES_DATASET", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
// Job → Dataset (via input_dataset_id) // Job → Dataset (via input_dataset_id)
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
mergeEntityToDatasets(graphId, job, "input_dataset_id", datasetMap, result, syncId); mergeEntityToDatasets(graphId, job, "input_dataset_id", datasetMap, result, syncId);
} }
// LabelTask → Dataset (via dataset_id) // LabelTask → Dataset (via dataset_id)
for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { List<GraphEntity> tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask");
if (changedEntityIds != null) {
tasks = tasks.stream()
.filter(task -> changedEntityIds.contains(task.getId()))
.toList();
}
for (GraphEntity task : tasks) {
mergeEntityToDatasets(graphId, task, "dataset_id", datasetMap, result, syncId); mergeEntityToDatasets(graphId, task, "dataset_id", datasetMap, result, syncId);
} }
// Workflow → Dataset (via input_dataset_ids, multi-value) // Workflow → Dataset (via input_dataset_ids, multi-value)
for (GraphEntity workflow : entityRepository.findByGraphIdAndType(graphId, "Workflow")) { List<GraphEntity> workflows = entityRepository.findByGraphIdAndType(graphId, "Workflow");
if (changedEntityIds != null) {
workflows = workflows.stream()
.filter(workflow -> changedEntityIds.contains(workflow.getId()))
.toList();
}
for (GraphEntity workflow : workflows) {
mergeEntityToDatasets(graphId, workflow, "input_dataset_ids", datasetMap, result, syncId); mergeEntityToDatasets(graphId, workflow, "input_dataset_ids", datasetMap, result, syncId);
} }
@@ -616,11 +692,23 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeProducesRelations(String graphId, String syncId) { public SyncResult mergeProducesRelations(String graphId, String syncId) {
return mergeProducesRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeProducesRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("PRODUCES", syncId); SyncResult result = beginResult("PRODUCES", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
try { try {
Object outputDatasetId = job.getProperties().get("output_dataset_id"); Object outputDatasetId = job.getProperties().get("output_dataset_id");
if (outputDatasetId == null || outputDatasetId.toString().isBlank()) { if (outputDatasetId == null || outputDatasetId.toString().isBlank()) {
@@ -647,17 +735,34 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeAssignedToRelations(String graphId, String syncId) { public SyncResult mergeAssignedToRelations(String graphId, String syncId) {
return mergeAssignedToRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeAssignedToRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("ASSIGNED_TO", syncId); SyncResult result = beginResult("ASSIGNED_TO", syncId);
Map<String, String> userMap = buildSourceIdToEntityIdMap(graphId, "User"); Map<String, String> userMap = buildSourceIdToEntityIdMap(graphId, "User");
// LabelTask → User // LabelTask → User
for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { List<GraphEntity> tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask");
if (changedEntityIds != null) {
tasks = tasks.stream()
.filter(task -> changedEntityIds.contains(task.getId()))
.toList();
}
for (GraphEntity task : tasks) {
mergeCreatorAssignment(graphId, task, "label_task", userMap, result, syncId); mergeCreatorAssignment(graphId, task, "label_task", userMap, result, syncId);
} }
// Job → User // Job → User
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
mergeCreatorAssignment(graphId, job, "job", userMap, result, syncId); mergeCreatorAssignment(graphId, job, "job", userMap, result, syncId);
} }
@@ -692,11 +797,23 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeTriggersRelations(String graphId, String syncId) { public SyncResult mergeTriggersRelations(String graphId, String syncId) {
return mergeTriggersRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeTriggersRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("TRIGGERS", syncId); SyncResult result = beginResult("TRIGGERS", syncId);
Map<String, String> workflowMap = buildSourceIdToEntityIdMap(graphId, "Workflow"); Map<String, String> workflowMap = buildSourceIdToEntityIdMap(graphId, "Workflow");
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
try { try {
Object workflowId = job.getProperties().get("workflow_id"); Object workflowId = job.getProperties().get("workflow_id");
if (workflowId == null || workflowId.toString().isBlank()) { if (workflowId == null || workflowId.toString().isBlank()) {
@@ -724,11 +841,23 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeDependsOnRelations(String graphId, String syncId) { public SyncResult mergeDependsOnRelations(String graphId, String syncId) {
return mergeDependsOnRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeDependsOnRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("DEPENDS_ON", syncId); SyncResult result = beginResult("DEPENDS_ON", syncId);
Map<String, String> jobMap = buildSourceIdToEntityIdMap(graphId, "Job"); Map<String, String> jobMap = buildSourceIdToEntityIdMap(graphId, "Job");
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
try { try {
Object depJobId = job.getProperties().get("depends_on_job_id"); Object depJobId = job.getProperties().get("depends_on_job_id");
if (depJobId == null || depJobId.toString().isBlank()) { if (depJobId == null || depJobId.toString().isBlank()) {
@@ -751,29 +880,159 @@ public class GraphSyncStepService {
} }
/** /**
* 构建 IMPACTS 关系:Field → Field。 * 构建 IMPACTS 关系:Field → Field(字段级血缘)
* <p> * <p>
* TODO: 字段影响关系来源于 LLM 抽取或规则引擎,而非简单外键关联。 * 通过两种途径推导字段间的影响关系:
* 当前 MVP 阶段为占位实现,后续由抽取模块填充。 * <ol>
* <li>DERIVED_FROM:若 Dataset B 派生自 Dataset A(parent_dataset_id),
* 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT)。</li>
* <li>Job 输入/输出:若 Job 使用 Dataset A 并产出 Dataset B,
* 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT, job_id=源 ID)。</li>
* </ol>
*/ */
@Transactional @Transactional
public SyncResult mergeImpactsRelations(String graphId, String syncId) { public SyncResult mergeImpactsRelations(String graphId, String syncId) {
return mergeImpactsRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeImpactsRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("IMPACTS", syncId); SyncResult result = beginResult("IMPACTS", syncId);
result.setPlaceholder(true);
log.debug("[{}] IMPACTS relations require extraction data, skipping in sync phase", syncId); // 1. 加载所有 Field,按 dataset_source_id 分组
List<GraphEntity> allFields = entityRepository.findByGraphIdAndType(graphId, "Field");
Map<String, List<GraphEntity>> fieldsByDataset = allFields.stream()
.filter(f -> f.getProperties().get("dataset_source_id") != null)
.collect(Collectors.groupingBy(
f -> f.getProperties().get("dataset_source_id").toString()));
if (fieldsByDataset.isEmpty()) {
log.debug("[{}] No fields with dataset_source_id found, skipping IMPACTS", syncId);
return endResult(result);
}
// 记录已处理的 (sourceDatasetId, targetDatasetId) 对,避免重复
Set<String> processedPairs = new HashSet<>();
// 2. DERIVED_FROM 推导:parent dataset fields → child dataset fields
List<GraphEntity> allDatasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
// 增量同步时只处理变更的数据集
if (changedEntityIds != null) {
allDatasets = allDatasets.stream()
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
.toList();
}
for (GraphEntity dataset : allDatasets) {
Object parentId = dataset.getProperties().get("parent_dataset_id");
if (parentId == null || parentId.toString().isBlank()) {
continue;
}
String pairKey = parentId + "" + dataset.getSourceId();
processedPairs.add(pairKey);
mergeFieldImpacts(graphId, parentId.toString(), dataset.getSourceId(),
fieldsByDataset, null, result, syncId);
}
// 3. Job 输入/输出推导:input dataset fields → output dataset fields
List<GraphEntity> allJobs = entityRepository.findByGraphIdAndType(graphId, "Job");
// 增量同步时只处理变更的作业
if (changedEntityIds != null) {
allJobs = allJobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : allJobs) {
Object inputDsId = job.getProperties().get("input_dataset_id");
Object outputDsId = job.getProperties().get("output_dataset_id");
if (inputDsId == null || outputDsId == null
|| inputDsId.toString().isBlank() || outputDsId.toString().isBlank()) {
continue;
}
String pairKey = inputDsId + "" + outputDsId;
if (processedPairs.contains(pairKey)) {
continue;
}
processedPairs.add(pairKey);
mergeFieldImpacts(graphId, inputDsId.toString(), outputDsId.toString(),
fieldsByDataset, job.getSourceId(), result, syncId);
}
return endResult(result); return endResult(result);
} }
/**
* 对两个关联 Dataset 的字段按名称匹配,创建 IMPACTS 关系。
*/
private void mergeFieldImpacts(String graphId,
String sourceDatasetSourceId, String targetDatasetSourceId,
Map<String, List<GraphEntity>> fieldsByDataset,
String jobSourceId,
SyncResult result, String syncId) {
List<GraphEntity> sourceFields = fieldsByDataset.getOrDefault(sourceDatasetSourceId, List.of());
List<GraphEntity> targetFields = fieldsByDataset.getOrDefault(targetDatasetSourceId, List.of());
if (sourceFields.isEmpty() || targetFields.isEmpty()) {
return;
}
// 目标字段按名称索引
Map<String, GraphEntity> targetByName = targetFields.stream()
.filter(f -> f.getName() != null && !f.getName().isBlank())
.collect(Collectors.toMap(GraphEntity::getName, f -> f, (a, b) -> a));
for (GraphEntity srcField : sourceFields) {
if (srcField.getName() == null || srcField.getName().isBlank()) {
continue;
}
GraphEntity tgtField = targetByName.get(srcField.getName());
if (tgtField == null) {
continue;
}
try {
String propsJson = jobSourceId != null
? "{\"impact_type\":\"DIRECT\",\"job_id\":\"" + sanitizePropertyValue(jobSourceId) + "\"}"
: "{\"impact_type\":\"DIRECT\"}";
boolean created = mergeRelation(graphId, srcField.getId(), tgtField.getId(),
"IMPACTS", propsJson, syncId);
if (created) {
result.incrementCreated();
} else {
result.incrementSkipped();
}
} catch (Exception e) {
log.warn("[{}] Failed to merge IMPACTS: {} → {}", syncId,
srcField.getId(), tgtField.getId(), e);
result.addError("impacts:" + srcField.getId());
}
}
}
/** /**
* 构建 SOURCED_FROM 关系:KnowledgeSet → Dataset(通过 source_dataset_ids)。 * 构建 SOURCED_FROM 关系:KnowledgeSet → Dataset(通过 source_dataset_ids)。
*/ */
@Transactional @Transactional
public SyncResult mergeSourcedFromRelations(String graphId, String syncId) { public SyncResult mergeSourcedFromRelations(String graphId, String syncId) {
return mergeSourcedFromRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeSourcedFromRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("SOURCED_FROM", syncId); SyncResult result = beginResult("SOURCED_FROM", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
for (GraphEntity ks : entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet")) { List<GraphEntity> knowledgeSets = entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet");
if (changedEntityIds != null) {
knowledgeSets = knowledgeSets.stream()
.filter(ks -> changedEntityIds.contains(ks.getId()))
.toList();
}
for (GraphEntity ks : knowledgeSets) {
try { try {
Object sourceIds = ks.getProperties().get("source_dataset_ids"); Object sourceIds = ks.getProperties().get("source_dataset_ids");
if (sourceIds == null) { if (sourceIds == null) {
@@ -847,7 +1106,8 @@ public class GraphSyncStepService {
"ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " + "ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " +
" e.name = $name, e.description = $description, " + " e.name = $name, e.description = $description, " +
" e.created_at = datetime(), e.updated_at = datetime()" + extraSet + " " + " e.created_at = datetime(), e.updated_at = datetime()" + extraSet + " " +
"ON MATCH SET e.name = $name, e.description = $description, " + "ON MATCH SET e.name = CASE WHEN $name <> '' THEN $name ELSE e.name END, " +
" e.description = CASE WHEN $description <> '' THEN $description ELSE e.description END, " +
" e.updated_at = datetime()" + extraSet + " " + " e.updated_at = datetime()" + extraSet + " " +
"RETURN e.id = $newId AS isNew" "RETURN e.id = $newId AS isNew"
) )
@@ -871,6 +1131,16 @@ public class GraphSyncStepService {
return key.replaceAll("[^a-zA-Z0-9_]", ""); return key.replaceAll("[^a-zA-Z0-9_]", "");
} }
/**
* 清理属性值用于 JSON 字符串拼接,转义双引号和反斜杠,防止 JSON 注入。
*/
private static String sanitizePropertyValue(String value) {
if (value == null) {
return "";
}
return value.replace("\\", "\\\\").replace("\"", "\\\"");
}
/** /**
* 将 Java 值转换为 Neo4j 兼容的属性值。 * 将 Java 值转换为 Neo4j 兼容的属性值。
* <p> * <p>
@@ -925,6 +1195,7 @@ public class GraphSyncStepService {
"MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " + "MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " +
"ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " + "ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " +
" r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " + " r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " +
"ON MATCH SET r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " +
"RETURN r.id AS relId" "RETURN r.id AS relId"
) )
.bindAll(Map.of( .bindAll(Map.of(

View File

@@ -33,6 +33,7 @@ public class SyncMetadata {
public static final String STATUS_PARTIAL = "PARTIAL"; public static final String STATUS_PARTIAL = "PARTIAL";
public static final String TYPE_FULL = "FULL"; public static final String TYPE_FULL = "FULL";
public static final String TYPE_INCREMENTAL = "INCREMENTAL";
public static final String TYPE_DATASETS = "DATASETS"; public static final String TYPE_DATASETS = "DATASETS";
public static final String TYPE_FIELDS = "FIELDS"; public static final String TYPE_FIELDS = "FIELDS";
public static final String TYPE_USERS = "USERS"; public static final String TYPE_USERS = "USERS";

View File

@@ -345,16 +345,13 @@ public class GraphRelationRepository {
.query( .query(
"MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " + "MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " +
"MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " + "MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " +
"CREATE (s)-[r:" + REL_TYPE + " {" + "MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " +
" id: $id," + "ON CREATE SET r.id = $id, r.weight = $weight, r.confidence = $confidence, " +
" relation_type: $relationType," + " r.source_id = $sourceId, r.properties_json = $propertiesJson, r.created_at = $createdAt " +
" weight: $weight," + "ON MATCH SET r.weight = CASE WHEN $weight IS NOT NULL THEN $weight ELSE r.weight END, " +
" confidence: $confidence," + " r.confidence = CASE WHEN $confidence IS NOT NULL THEN $confidence ELSE r.confidence END, " +
" source_id: $sourceId," + " r.source_id = CASE WHEN $sourceId <> '' THEN $sourceId ELSE r.source_id END, " +
" graph_id: $graphId," + " r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " +
" properties_json: $propertiesJson," +
" created_at: $createdAt" +
"}]->(t) " +
RETURN_COLUMNS RETURN_COLUMNS
) )
.bindAll(params) .bindAll(params)

View File

@@ -49,6 +49,18 @@ public class GraphSyncController {
return SyncMetadataVO.from(metadata); return SyncMetadataVO.from(metadata);
} }
/**
* 增量同步:仅拉取指定时间窗口内变更的数据并同步。
*/
@PostMapping("/incremental")
public SyncMetadataVO syncIncremental(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedFrom,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedTo) {
SyncMetadata metadata = syncService.syncIncremental(graphId, updatedFrom, updatedTo);
return SyncMetadataVO.from(metadata);
}
/** /**
* 同步数据集实体。 * 同步数据集实体。
*/ */

View File

@@ -566,6 +566,163 @@ class GraphSyncServiceTest {
} }
} }
// -----------------------------------------------------------------------
// 增量同步
// -----------------------------------------------------------------------
@Nested
class IncrementalSyncTest {
private final LocalDateTime UPDATED_FROM = LocalDateTime.of(2025, 6, 1, 0, 0);
private final LocalDateTime UPDATED_TO = LocalDateTime.of(2025, 6, 30, 23, 59);
@Test
void syncIncremental_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(INVALID_GRAPH_ID, UPDATED_FROM, UPDATED_TO))
.isInstanceOf(BusinessException.class);
}
@Test
void syncIncremental_nullUpdatedFrom_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, null, UPDATED_TO))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("updatedFrom");
}
@Test
void syncIncremental_nullUpdatedTo_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, null))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("updatedTo");
}
@Test
void syncIncremental_fromAfterTo_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_TO, UPDATED_FROM))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("updatedFrom");
}
@Test
void syncIncremental_success_passesTimeWindowToClient() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
dto.setCreatedBy("admin");
when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of(dto));
when(dataManagementClient.listAllWorkflows(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
when(dataManagementClient.listAllJobs(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
when(dataManagementClient.listAllLabelTasks(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
when(dataManagementClient.listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
stubAllEntityUpserts();
stubAllRelationMerges();
SyncMetadata metadata = syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO);
assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL);
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(metadata.getUpdatedFrom()).isEqualTo(UPDATED_FROM);
assertThat(metadata.getUpdatedTo()).isEqualTo(UPDATED_TO);
assertThat(metadata.getResults()).hasSize(18);
// 验证使用了带时间窗口的 client 方法
verify(dataManagementClient).listAllDatasets(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllWorkflows(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllJobs(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllLabelTasks(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO);
// 验证不执行 purge
verify(stepService, never()).purgeStaleEntities(anyString(), anyString(), anySet(), anyString());
}
@Test
void syncIncremental_failure_recordsMetadataWithTimeWindow() {
when(properties.getSync()).thenReturn(syncConfig);
when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO))
.thenThrow(new RuntimeException("connection refused"));
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO))
.isInstanceOf(BusinessException.class);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL);
assertThat(saved.getUpdatedFrom()).isEqualTo(UPDATED_FROM);
assertThat(saved.getUpdatedTo()).isEqualTo(UPDATED_TO);
}
private void stubAllEntityUpserts() {
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").build());
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Field").build());
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
.thenReturn(SyncResult.builder().syncType("User").build());
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("Org").build());
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
}
private void stubAllRelationMerges() {
// 2-参数版本(全量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误
lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
// 3-参数版本(增量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误
lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
}
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// 同步历史查询 // 同步历史查询
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------

View File

@@ -749,14 +749,129 @@ class GraphSyncStepServiceTest {
} }
@Test @Test
void mergeImpacts_returnsPlaceholderResult() { void mergeImpacts_noFields_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")).thenReturn(List.of());
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID); SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("IMPACTS"); assertThat(result.getSyncType()).isEqualTo("IMPACTS");
assertThat(result.getCreated()).isEqualTo(0); assertThat(result.getCreated()).isEqualTo(0);
assertThat(result.isPlaceholder()).isTrue(); assertThat(result.isPlaceholder()).isFalse();
verifyNoInteractions(neo4jClient); verifyNoInteractions(neo4jClient);
verifyNoInteractions(entityRepository); }
@Test
void mergeImpacts_derivedFrom_matchingFieldNames_createsRelation() {
setupNeo4jQueryChain(String.class, "new-rel-id");
// Parent dataset (source_id = "ds-parent")
GraphEntity parentDs = GraphEntity.builder()
.id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
// Child dataset (source_id = "ds-child", parent_dataset_id = "ds-parent")
GraphEntity childDs = GraphEntity.builder()
.id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent")))
.build();
// Fields with matching name "user_id"
GraphEntity parentField = GraphEntity.builder()
.id("field-parent-uid").name("user_id").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent")))
.build();
GraphEntity childField = GraphEntity.builder()
.id("field-child-uid").name("user_id").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-child")))
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
.thenReturn(List.of(parentField, childField));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
.thenReturn(List.of(parentDs, childDs));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
.thenReturn(List.of());
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
}
@Test
void mergeImpacts_noMatchingFieldNames_createsNoRelation() {
GraphEntity parentDs = GraphEntity.builder()
.id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
GraphEntity childDs = GraphEntity.builder()
.id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent")))
.build();
GraphEntity parentField = GraphEntity.builder()
.id("field-parent").name("col_a").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent")))
.build();
GraphEntity childField = GraphEntity.builder()
.id("field-child").name("col_b").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-child")))
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
.thenReturn(List.of(parentField, childField));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
.thenReturn(List.of(parentDs, childDs));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
.thenReturn(List.of());
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getCreated()).isEqualTo(0);
verifyNoInteractions(neo4jClient);
}
@Test
void mergeImpacts_jobInputOutput_createsRelationWithJobId() {
setupNeo4jQueryChain(String.class, "new-rel-id");
GraphEntity inputDs = GraphEntity.builder()
.id("input-entity").sourceId("ds-in").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
GraphEntity outputDs = GraphEntity.builder()
.id("output-entity").sourceId("ds-out").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
GraphEntity job = GraphEntity.builder()
.id("job-entity").sourceId("job-001").type("Job").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of(
"input_dataset_id", "ds-in",
"output_dataset_id", "ds-out")))
.build();
GraphEntity inField = GraphEntity.builder()
.id("field-in").name("tag_x").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-in")))
.build();
GraphEntity outField = GraphEntity.builder()
.id("field-out").name("tag_x").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-out")))
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
.thenReturn(List.of(inField, outField));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
.thenReturn(List.of(inputDs, outputDs));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
.thenReturn(List.of(job));
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
} }
@Test @Test