Compare commits

..

3 Commits

Author SHA1 Message Date
444f8cd015 fix: 修复知识图谱模块 P0/P1/P2/P3 问题
【P0 - 安全风险修复】
- InternalTokenInterceptor: fail-open → fail-closed
  - 未配置 token 时直接拒绝(401)
  - 仅 dev/test 环境可显式跳过校验
- KnowledgeGraphProperties: 新增 skipTokenCheck 配置项
- application-knowledgegraph.yml: 新增 skip-token-check 配置

【P1 - 文档版本控制】
- .gitignore: 移除 docs/knowledge-graph/ 忽略规则
- schema 文档现已纳入版本控制

【P2 - 代码质量改进】
- InternalTokenInterceptor: 错误响应改为 Response.error() 格式
- 新增 InternalTokenInterceptorTest.java(7 个测试用例)
  - fail-closed 行为验证
  - token 校验逻辑验证
  - 错误响应格式验证

【P3 - 文档一致性】
- README.md: 相对链接改为显式 GitHub 链接

【验证结果】
- 编译通过
- 198 个测试全部通过(0 failures)
2026-02-19 13:03:42 +08:00
f12e4abd83 fix(kg): 根据 Codex 审查反馈修复知识图谱同步问题
修复内容:
1. [P1] 修复 job_id 错误清洗问题
   - 新增 sanitizePropertyValue() 方法对属性值进行安全处理
   - 修复 IMPACTS 关系中 job_id JSON 注入风险

2. [P2] 修复增量同步关系全量重算问题
   - 为所有关系构建方法添加 changedEntityIds 参数支持
   - 增量同步时仅处理变更实体相关的关系,提升性能

3. [P2] 修复 MERGE ON MATCH 覆盖属性问题
   - 实体 upsert 时保留原有非空 name/description 值
   - 关系 MERGE 时保留原有非空 properties_json 值
   - GraphRelationRepository 中优化条件覆盖逻辑

4. 修复测试 Mock stub 签名不匹配问题
   - 同时支持 2 参数和 3 参数版本的关系方法
   - 使用 lenient() 模式避免 unnecessary stubbing 错误

Co-Authored-By: Claude Sonnet 4 <noreply@anthropic.com>
2026-02-19 09:56:16 +08:00
42069f82b3 feat(kg): P0-04 同步结果元数据增强
实现同步历史记录和元数据功能:

新增功能:
- 添加 SyncHistory 节点记录同步历史
- 添加 /history 和 /history/range API 查询同步历史
- 添加 /full API 返回完整同步结果(含元数据)

问题修复:
- [P1] syncId 改为完整 UUID (36位),添加 (graph_id, sync_id) 唯一约束
- [P2-1] /history limit 添加 @Min(1) @Max(200) 边界校验
- [P2-2] /history/range 添加分页 (page, size),skip 越界保护 (>2M)
- [P2-3] 添加 SyncHistory 索引:(graph_id, started_at), (graph_id, status, started_at)

测试:
- 182 tests 通过 (新增 2 个测试)
- GraphSyncServiceTest, GraphInitializerTest, SyncMetadataTest 全部通过

代码变更:+521 行,-27 行
新增文件:4 个 (SyncMetadata, SyncHistoryRepository, SyncMetadataVO, SyncMetadataTest)
修改文件:5 个
2026-02-18 16:55:03 +08:00
21 changed files with 1976 additions and 74 deletions

View File

@@ -110,9 +110,9 @@ Thank you for your interest in this project! We warmly welcome contributions fro
bug reports, suggesting new features, or directly participating in code development, all forms of help make the project bug reports, suggesting new features, or directly participating in code development, all forms of help make the project
better. better.
• 📮 [GitHub Issues](../../issues): Submit bugs or feature suggestions. • 📮 [GitHub Issues](https://github.com/ModelEngine-Group/DataMate/issues): Submit bugs or feature suggestions.
• 🔧 [GitHub Pull Requests](../../pulls): Contribute code improvements. • 🔧 [GitHub Pull Requests](https://github.com/ModelEngine-Group/DataMate/pulls): Contribute code improvements.
## 📄 License ## 📄 License

View File

@@ -2,7 +2,9 @@ package com.datamate.knowledgegraph.application;
import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode; import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import com.datamate.knowledgegraph.domain.model.SyncResult; import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
@@ -15,6 +17,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@@ -52,6 +55,7 @@ public class GraphSyncService {
private final GraphSyncStepService stepService; private final GraphSyncStepService stepService;
private final DataManagementClient dataManagementClient; private final DataManagementClient dataManagementClient;
private final KnowledgeGraphProperties properties; private final KnowledgeGraphProperties properties;
private final SyncHistoryRepository syncHistoryRepository;
/** 同 graphId 互斥锁,防止并发同步。 */ /** 同 graphId 互斥锁,防止并发同步。 */
private final ConcurrentHashMap<String, ReentrantLock> graphLocks = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, ReentrantLock> graphLocks = new ConcurrentHashMap<>();
@@ -60,9 +64,10 @@ public class GraphSyncService {
// 全量同步 // 全量同步
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
public List<SyncResult> syncAll(String graphId) { public SyncMetadata syncAll(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
@@ -178,10 +183,16 @@ public class GraphSyncService {
results.stream() results.stream()
.map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")") .map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")")
.collect(Collectors.joining(", "))); .collect(Collectors.joining(", ")));
return results;
SyncMetadata metadata = SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, results);
saveSyncHistory(metadata);
return metadata;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage()));
log.error("[{}] Full sync failed for graphId={}", syncId, graphId, e); log.error("[{}] Full sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "全量同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "全量同步失败,syncId=" + syncId);
} finally { } finally {
@@ -189,13 +200,116 @@ public class GraphSyncService {
} }
} }
// -----------------------------------------------------------------------
// 增量同步
// -----------------------------------------------------------------------
/**
* 增量同步:仅拉取指定时间窗口内变更的数据并同步到 Neo4j。
* <p>
* 与全量同步的区别:
* <ul>
* <li>通过 updatedFrom/updatedTo 过滤变更数据</li>
* <li>不执行 purge(不删除旧实体)</li>
* <li>在 SyncMetadata 中记录时间窗口</li>
* </ul>
*/
public SyncMetadata syncIncremental(String graphId, LocalDateTime updatedFrom, LocalDateTime updatedTo) {
validateGraphId(graphId);
if (updatedFrom == null || updatedTo == null) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "增量同步必须指定 updatedFrom 和 updatedTo");
}
if (updatedFrom.isAfter(updatedTo)) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "updatedFrom 不能晚于 updatedTo");
}
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
log.info("[{}] Starting incremental sync for graphId={}, window=[{}, {}]",
syncId, graphId, updatedFrom, updatedTo);
// 拉取时间窗口内变更的数据
List<DatasetDTO> datasets = fetchWithRetry(syncId, "datasets",
() -> dataManagementClient.listAllDatasets(updatedFrom, updatedTo));
List<WorkflowDTO> workflows = fetchWithRetry(syncId, "workflows",
() -> dataManagementClient.listAllWorkflows(updatedFrom, updatedTo));
List<JobDTO> jobs = fetchWithRetry(syncId, "jobs",
() -> dataManagementClient.listAllJobs(updatedFrom, updatedTo));
List<LabelTaskDTO> labelTasks = fetchWithRetry(syncId, "label-tasks",
() -> dataManagementClient.listAllLabelTasks(updatedFrom, updatedTo));
List<KnowledgeSetDTO> knowledgeSets = fetchWithRetry(syncId, "knowledge-sets",
() -> dataManagementClient.listAllKnowledgeSets(updatedFrom, updatedTo));
Map<String, SyncResult> resultMap = new LinkedHashMap<>();
// 实体同步(仅 upsert,不 purge)
resultMap.put("Dataset", stepService.upsertDatasetEntities(graphId, datasets, syncId));
resultMap.put("Field", stepService.upsertFieldEntities(graphId, datasets, syncId));
Set<String> usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets);
resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId));
resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId));
resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId));
resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId));
resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId));
resultMap.put("KnowledgeSet", stepService.upsertKnowledgeSetEntities(graphId, knowledgeSets, syncId));
// 收集所有变更(创建或更新)的实体ID,用于增量关系构建
Set<String> changedEntityIds = collectChangedEntityIds(datasets, workflows, jobs, labelTasks, knowledgeSets, graphId);
// 关系构建(MERGE 幂等)- 增量同步时只处理变更实体相关的关系
resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId, changedEntityIds));
resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId, changedEntityIds));
resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId, changedEntityIds));
resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId, changedEntityIds));
resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId, changedEntityIds));
resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId, changedEntityIds));
resultMap.put("TRIGGERS", stepService.mergeTriggersRelations(graphId, syncId, changedEntityIds));
resultMap.put("DEPENDS_ON", stepService.mergeDependsOnRelations(graphId, syncId, changedEntityIds));
resultMap.put("IMPACTS", stepService.mergeImpactsRelations(graphId, syncId, changedEntityIds));
resultMap.put("SOURCED_FROM", stepService.mergeSourcedFromRelations(graphId, syncId, changedEntityIds));
List<SyncResult> results = new ArrayList<>(resultMap.values());
log.info("[{}] Incremental sync completed for graphId={}. Summary: {}", syncId, graphId,
results.stream()
.map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")")
.collect(Collectors.joining(", ")));
SyncMetadata metadata = SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, results);
metadata.setUpdatedFrom(updatedFrom);
metadata.setUpdatedTo(updatedTo);
saveSyncHistory(metadata);
return metadata;
} catch (BusinessException e) {
SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage());
failed.setUpdatedFrom(updatedFrom);
failed.setUpdatedTo(updatedTo);
saveSyncHistory(failed);
throw e;
} catch (Exception e) {
SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage());
failed.setUpdatedFrom(updatedFrom);
failed.setUpdatedTo(updatedTo);
saveSyncHistory(failed);
log.error("[{}] Incremental sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "增量同步失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// 单步同步(各自获取锁和数据) // 单步同步(各自获取锁和数据)
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
public SyncResult syncDatasets(String graphId) { public SyncResult syncDatasets(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId); List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
@@ -206,10 +320,14 @@ public class GraphSyncService {
.collect(Collectors.toSet()); .collect(Collectors.toSet());
int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId); int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId);
result.setPurged(purged); result.setPurged(purged);
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, List.of(result)));
return result; return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage()));
log.error("[{}] Dataset sync failed for graphId={}", syncId, graphId, e); log.error("[{}] Dataset sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "数据集同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "数据集同步失败,syncId=" + syncId);
} finally { } finally {
@@ -219,7 +337,8 @@ public class GraphSyncService {
public SyncResult syncFields(String graphId) { public SyncResult syncFields(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId); List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
@@ -237,10 +356,14 @@ public class GraphSyncService {
} }
} }
result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId)); result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, List.of(result)));
return result; return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage()));
log.error("[{}] Field sync failed for graphId={}", syncId, graphId, e); log.error("[{}] Field sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "字段同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "字段同步失败,syncId=" + syncId);
} finally { } finally {
@@ -250,7 +373,8 @@ public class GraphSyncService {
public SyncResult syncUsers(String graphId) { public SyncResult syncUsers(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId); List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
@@ -266,10 +390,14 @@ public class GraphSyncService {
SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId); SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId);
Set<String> activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet()); Set<String> activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId)); result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, List.of(result)));
return result; return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage()));
log.error("[{}] User sync failed for graphId={}", syncId, graphId, e); log.error("[{}] User sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "用户同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "用户同步失败,syncId=" + syncId);
} finally { } finally {
@@ -279,13 +407,19 @@ public class GraphSyncService {
public SyncResult syncOrgs(String graphId) { public SyncResult syncOrgs(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.upsertOrgEntities(graphId, syncId); SyncResult result = stepService.upsertOrgEntities(graphId, syncId);
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage()));
log.error("[{}] Org sync failed for graphId={}", syncId, graphId, e); log.error("[{}] Org sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "组织同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "组织同步失败,syncId=" + syncId);
} finally { } finally {
@@ -295,7 +429,7 @@ public class GraphSyncService {
public SyncResult buildHasFieldRelations(String graphId) { public SyncResult buildHasFieldRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeHasFieldRelations(graphId, syncId); return stepService.mergeHasFieldRelations(graphId, syncId);
@@ -312,7 +446,7 @@ public class GraphSyncService {
public SyncResult buildDerivedFromRelations(String graphId) { public SyncResult buildDerivedFromRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeDerivedFromRelations(graphId, syncId); return stepService.mergeDerivedFromRelations(graphId, syncId);
@@ -329,7 +463,7 @@ public class GraphSyncService {
public SyncResult buildBelongsToRelations(String graphId) { public SyncResult buildBelongsToRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeBelongsToRelations(graphId, syncId); return stepService.mergeBelongsToRelations(graphId, syncId);
@@ -350,7 +484,8 @@ public class GraphSyncService {
public SyncResult syncWorkflows(String graphId) { public SyncResult syncWorkflows(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
List<WorkflowDTO> workflows = fetchWithRetry(syncId, "workflows", List<WorkflowDTO> workflows = fetchWithRetry(syncId, "workflows",
@@ -361,10 +496,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank()) .filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet()); .collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "Workflow", activeIds, syncId)); result.setPurged(stepService.purgeStaleEntities(graphId, "Workflow", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, List.of(result)));
return result; return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage()));
log.error("[{}] Workflow sync failed for graphId={}", syncId, graphId, e); log.error("[{}] Workflow sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "工作流同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "工作流同步失败,syncId=" + syncId);
} finally { } finally {
@@ -374,7 +513,8 @@ public class GraphSyncService {
public SyncResult syncJobs(String graphId) { public SyncResult syncJobs(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
List<JobDTO> jobs = fetchWithRetry(syncId, "jobs", List<JobDTO> jobs = fetchWithRetry(syncId, "jobs",
@@ -385,10 +525,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank()) .filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet()); .collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "Job", activeIds, syncId)); result.setPurged(stepService.purgeStaleEntities(graphId, "Job", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, List.of(result)));
return result; return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage()));
log.error("[{}] Job sync failed for graphId={}", syncId, graphId, e); log.error("[{}] Job sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "作业同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "作业同步失败,syncId=" + syncId);
} finally { } finally {
@@ -398,7 +542,8 @@ public class GraphSyncService {
public SyncResult syncLabelTasks(String graphId) { public SyncResult syncLabelTasks(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
List<LabelTaskDTO> tasks = fetchWithRetry(syncId, "label-tasks", List<LabelTaskDTO> tasks = fetchWithRetry(syncId, "label-tasks",
@@ -409,10 +554,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank()) .filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet()); .collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "LabelTask", activeIds, syncId)); result.setPurged(stepService.purgeStaleEntities(graphId, "LabelTask", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, List.of(result)));
return result; return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage()));
log.error("[{}] LabelTask sync failed for graphId={}", syncId, graphId, e); log.error("[{}] LabelTask sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "标注任务同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "标注任务同步失败,syncId=" + syncId);
} finally { } finally {
@@ -422,7 +571,8 @@ public class GraphSyncService {
public SyncResult syncKnowledgeSets(String graphId) { public SyncResult syncKnowledgeSets(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
List<KnowledgeSetDTO> knowledgeSets = fetchWithRetry(syncId, "knowledge-sets", List<KnowledgeSetDTO> knowledgeSets = fetchWithRetry(syncId, "knowledge-sets",
@@ -433,10 +583,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank()) .filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet()); .collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "KnowledgeSet", activeIds, syncId)); result.setPurged(stepService.purgeStaleEntities(graphId, "KnowledgeSet", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, List.of(result)));
return result; return result;
} catch (BusinessException e) { } catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage()));
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage()));
log.error("[{}] KnowledgeSet sync failed for graphId={}", syncId, graphId, e); log.error("[{}] KnowledgeSet sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "知识集同步失败,syncId=" + syncId); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "知识集同步失败,syncId=" + syncId);
} finally { } finally {
@@ -450,7 +604,7 @@ public class GraphSyncService {
public SyncResult buildUsesDatasetRelations(String graphId) { public SyncResult buildUsesDatasetRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeUsesDatasetRelations(graphId, syncId); return stepService.mergeUsesDatasetRelations(graphId, syncId);
@@ -467,7 +621,7 @@ public class GraphSyncService {
public SyncResult buildProducesRelations(String graphId) { public SyncResult buildProducesRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeProducesRelations(graphId, syncId); return stepService.mergeProducesRelations(graphId, syncId);
@@ -484,7 +638,7 @@ public class GraphSyncService {
public SyncResult buildAssignedToRelations(String graphId) { public SyncResult buildAssignedToRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeAssignedToRelations(graphId, syncId); return stepService.mergeAssignedToRelations(graphId, syncId);
@@ -501,7 +655,7 @@ public class GraphSyncService {
public SyncResult buildTriggersRelations(String graphId) { public SyncResult buildTriggersRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeTriggersRelations(graphId, syncId); return stepService.mergeTriggersRelations(graphId, syncId);
@@ -518,7 +672,7 @@ public class GraphSyncService {
public SyncResult buildDependsOnRelations(String graphId) { public SyncResult buildDependsOnRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeDependsOnRelations(graphId, syncId); return stepService.mergeDependsOnRelations(graphId, syncId);
@@ -535,7 +689,7 @@ public class GraphSyncService {
public SyncResult buildImpactsRelations(String graphId) { public SyncResult buildImpactsRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeImpactsRelations(graphId, syncId); return stepService.mergeImpactsRelations(graphId, syncId);
@@ -552,7 +706,7 @@ public class GraphSyncService {
public SyncResult buildSourcedFromRelations(String graphId) { public SyncResult buildSourcedFromRelations(String graphId) {
validateGraphId(graphId); validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8); String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId); ReentrantLock lock = acquireLock(graphId, syncId);
try { try {
return stepService.mergeSourcedFromRelations(graphId, syncId); return stepService.mergeSourcedFromRelations(graphId, syncId);
@@ -567,6 +721,43 @@ public class GraphSyncService {
} }
} }
// -----------------------------------------------------------------------
// 同步历史查询
// -----------------------------------------------------------------------
/**
* 查询同步历史记录。
*/
public List<SyncMetadata> getSyncHistory(String graphId, String status, int limit) {
validateGraphId(graphId);
if (status != null && !status.isBlank()) {
return syncHistoryRepository.findByGraphIdAndStatus(graphId, status, limit);
}
return syncHistoryRepository.findByGraphId(graphId, limit);
}
/**
* 按时间范围查询同步历史(分页)。
*/
public List<SyncMetadata> getSyncHistoryByTimeRange(String graphId,
LocalDateTime from, LocalDateTime to,
int page, int size) {
validateGraphId(graphId);
long skip = (long) page * size;
if (skip > 2_000_000L) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "分页偏移量超出允许范围");
}
return syncHistoryRepository.findByGraphIdAndTimeRange(graphId, from, to, skip, size);
}
/**
* 根据 syncId 查询单条同步记录。
*/
public Optional<SyncMetadata> getSyncRecord(String graphId, String syncId) {
validateGraphId(graphId);
return syncHistoryRepository.findByGraphIdAndSyncId(graphId, syncId);
}
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// 内部方法 // 内部方法
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@@ -671,6 +862,85 @@ public class GraphSyncService {
} }
} }
/**
* 持久化同步元数据,失败时仅记录日志,不影响主流程。
*/
private void saveSyncHistory(SyncMetadata metadata) {
try {
syncHistoryRepository.save(metadata);
} catch (Exception e) {
log.warn("[{}] Failed to save sync history: {}", metadata.getSyncId(), e.getMessage());
}
}
/**
* 收集增量同步中变更(创建或更新)的实体ID。
* 通过查询数据库获取这些sourceId对应的entityId。
*/
private Set<String> collectChangedEntityIds(List<DatasetDTO> datasets,
List<WorkflowDTO> workflows,
List<JobDTO> jobs,
List<LabelTaskDTO> labelTasks,
List<KnowledgeSetDTO> knowledgeSets,
String graphId) {
Set<String> entityIds = new HashSet<>();
// 通过数据管理客户端获取到的sourceId,需要转换为对应的entityId
// 这里使用简化的方法:查询所有相关类型的实体并根据sourceId匹配
try {
// 收集所有变更的sourceId
Set<String> changedSourceIds = new HashSet<>();
datasets.stream().filter(Objects::nonNull).map(DatasetDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
workflows.stream().filter(Objects::nonNull).map(WorkflowDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
jobs.stream().filter(Objects::nonNull).map(JobDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
labelTasks.stream().filter(Objects::nonNull).map(LabelTaskDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
knowledgeSets.stream().filter(Objects::nonNull).map(KnowledgeSetDTO::getId).filter(Objects::nonNull)
.forEach(changedSourceIds::add);
// 添加字段的sourceId
for (DatasetDTO dataset : datasets) {
if (dataset != null && dataset.getTags() != null) {
for (DataManagementClient.TagDTO tag : dataset.getTags()) {
if (tag != null && tag.getName() != null) {
changedSourceIds.add(dataset.getId() + ":tag:" + tag.getName());
}
}
}
}
// 查询这些sourceId对应的entityId
if (!changedSourceIds.isEmpty()) {
for (String sourceId : changedSourceIds) {
// 简化处理:这里可以优化为批量查询
String cypher = "MATCH (e:Entity {graph_id: $graphId, source_id: $sourceId}) RETURN e.id AS entityId";
List<String> foundEntityIds = stepService.neo4jClient.query(cypher)
.bindAll(Map.of("graphId", graphId, "sourceId", sourceId))
.fetchAs(String.class)
.mappedBy((ts, record) -> record.get("entityId").asString())
.all()
.stream().toList();
entityIds.addAll(foundEntityIds);
}
}
} catch (Exception e) {
log.warn("Failed to collect changed entity IDs, falling back to full relation rebuild: {}", e.getMessage());
// 如果收集失败,返回null表示进行全量关系构建
return null;
}
log.debug("Collected {} changed entity IDs for incremental relation building", entityIds.size());
return entityIds;
}
private void validateGraphId(String graphId) { private void validateGraphId(String graphId) {
if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) { if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效"); throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效");

View File

@@ -39,7 +39,7 @@ public class GraphSyncStepService {
private static final String REL_TYPE = "RELATED_TO"; private static final String REL_TYPE = "RELATED_TO";
private final GraphEntityRepository entityRepository; private final GraphEntityRepository entityRepository;
private final Neo4jClient neo4jClient; final Neo4jClient neo4jClient; // 改为包级别访问,供GraphSyncService使用
private final KnowledgeGraphProperties properties; private final KnowledgeGraphProperties properties;
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@@ -441,11 +441,35 @@ public class GraphSyncStepService {
@Transactional @Transactional
public SyncResult mergeHasFieldRelations(String graphId, String syncId) { public SyncResult mergeHasFieldRelations(String graphId, String syncId) {
return mergeHasFieldRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeHasFieldRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("HAS_FIELD", syncId); SyncResult result = beginResult("HAS_FIELD", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
List<GraphEntity> fields = entityRepository.findByGraphIdAndType(graphId, "Field"); List<GraphEntity> fields = entityRepository.findByGraphIdAndType(graphId, "Field");
// 增量同步时只处理变更相关的字段
if (changedEntityIds != null) {
fields = fields.stream()
.filter(field -> {
// 包含自身变更的字段
if (changedEntityIds.contains(field.getId())) {
return true;
}
// 包含关联数据集发生变更的字段
Object datasetSourceId = field.getProperties().get("dataset_source_id");
if (datasetSourceId != null) {
String datasetEntityId = datasetMap.get(datasetSourceId.toString());
return datasetEntityId != null && changedEntityIds.contains(datasetEntityId);
}
return false;
})
.toList();
}
for (GraphEntity field : fields) { for (GraphEntity field : fields) {
try { try {
Object datasetSourceId = field.getProperties().get("dataset_source_id"); Object datasetSourceId = field.getProperties().get("dataset_source_id");
@@ -477,11 +501,23 @@ public class GraphSyncStepService {
@Transactional @Transactional
public SyncResult mergeDerivedFromRelations(String graphId, String syncId) { public SyncResult mergeDerivedFromRelations(String graphId, String syncId) {
return mergeDerivedFromRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeDerivedFromRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("DERIVED_FROM", syncId); SyncResult result = beginResult("DERIVED_FROM", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
// 增量同步时只处理变更的数据集
if (changedEntityIds != null) {
datasets = datasets.stream()
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
.toList();
}
for (GraphEntity dataset : datasets) { for (GraphEntity dataset : datasets) {
try { try {
Object parentId = dataset.getProperties().get("parent_dataset_id"); Object parentId = dataset.getProperties().get("parent_dataset_id");
@@ -512,6 +548,11 @@ public class GraphSyncStepService {
@Transactional @Transactional
public SyncResult mergeBelongsToRelations(String graphId, String syncId) { public SyncResult mergeBelongsToRelations(String graphId, String syncId) {
return mergeBelongsToRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeBelongsToRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("BELONGS_TO", syncId); SyncResult result = beginResult("BELONGS_TO", syncId);
Optional<GraphEntity> defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType( Optional<GraphEntity> defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType(
@@ -524,7 +565,13 @@ public class GraphSyncStepService {
String orgId = defaultOrgOpt.get().getId(); String orgId = defaultOrgOpt.get().getId();
// User → Org // User → Org
for (GraphEntity user : entityRepository.findByGraphIdAndType(graphId, "User")) { List<GraphEntity> users = entityRepository.findByGraphIdAndType(graphId, "User");
if (changedEntityIds != null) {
users = users.stream()
.filter(user -> changedEntityIds.contains(user.getId()))
.toList();
}
for (GraphEntity user : users) {
try { try {
boolean created = mergeRelation(graphId, user.getId(), orgId, boolean created = mergeRelation(graphId, user.getId(), orgId,
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
@@ -536,7 +583,13 @@ public class GraphSyncStepService {
} }
// Dataset → Org // Dataset → Org
for (GraphEntity dataset : entityRepository.findByGraphIdAndType(graphId, "Dataset")) { List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
if (changedEntityIds != null) {
datasets = datasets.stream()
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
.toList();
}
for (GraphEntity dataset : datasets) {
try { try {
boolean created = mergeRelation(graphId, dataset.getId(), orgId, boolean created = mergeRelation(graphId, dataset.getId(), orgId,
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
@@ -559,22 +612,45 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeUsesDatasetRelations(String graphId, String syncId) { public SyncResult mergeUsesDatasetRelations(String graphId, String syncId) {
return mergeUsesDatasetRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeUsesDatasetRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("USES_DATASET", syncId); SyncResult result = beginResult("USES_DATASET", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
// Job → Dataset (via input_dataset_id) // Job → Dataset (via input_dataset_id)
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
mergeEntityToDatasets(graphId, job, "input_dataset_id", datasetMap, result, syncId); mergeEntityToDatasets(graphId, job, "input_dataset_id", datasetMap, result, syncId);
} }
// LabelTask → Dataset (via dataset_id) // LabelTask → Dataset (via dataset_id)
for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { List<GraphEntity> tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask");
if (changedEntityIds != null) {
tasks = tasks.stream()
.filter(task -> changedEntityIds.contains(task.getId()))
.toList();
}
for (GraphEntity task : tasks) {
mergeEntityToDatasets(graphId, task, "dataset_id", datasetMap, result, syncId); mergeEntityToDatasets(graphId, task, "dataset_id", datasetMap, result, syncId);
} }
// Workflow → Dataset (via input_dataset_ids, multi-value) // Workflow → Dataset (via input_dataset_ids, multi-value)
for (GraphEntity workflow : entityRepository.findByGraphIdAndType(graphId, "Workflow")) { List<GraphEntity> workflows = entityRepository.findByGraphIdAndType(graphId, "Workflow");
if (changedEntityIds != null) {
workflows = workflows.stream()
.filter(workflow -> changedEntityIds.contains(workflow.getId()))
.toList();
}
for (GraphEntity workflow : workflows) {
mergeEntityToDatasets(graphId, workflow, "input_dataset_ids", datasetMap, result, syncId); mergeEntityToDatasets(graphId, workflow, "input_dataset_ids", datasetMap, result, syncId);
} }
@@ -616,11 +692,23 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeProducesRelations(String graphId, String syncId) { public SyncResult mergeProducesRelations(String graphId, String syncId) {
return mergeProducesRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeProducesRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("PRODUCES", syncId); SyncResult result = beginResult("PRODUCES", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
try { try {
Object outputDatasetId = job.getProperties().get("output_dataset_id"); Object outputDatasetId = job.getProperties().get("output_dataset_id");
if (outputDatasetId == null || outputDatasetId.toString().isBlank()) { if (outputDatasetId == null || outputDatasetId.toString().isBlank()) {
@@ -647,17 +735,34 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeAssignedToRelations(String graphId, String syncId) { public SyncResult mergeAssignedToRelations(String graphId, String syncId) {
return mergeAssignedToRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeAssignedToRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("ASSIGNED_TO", syncId); SyncResult result = beginResult("ASSIGNED_TO", syncId);
Map<String, String> userMap = buildSourceIdToEntityIdMap(graphId, "User"); Map<String, String> userMap = buildSourceIdToEntityIdMap(graphId, "User");
// LabelTask → User // LabelTask → User
for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) { List<GraphEntity> tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask");
if (changedEntityIds != null) {
tasks = tasks.stream()
.filter(task -> changedEntityIds.contains(task.getId()))
.toList();
}
for (GraphEntity task : tasks) {
mergeCreatorAssignment(graphId, task, "label_task", userMap, result, syncId); mergeCreatorAssignment(graphId, task, "label_task", userMap, result, syncId);
} }
// Job → User // Job → User
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
mergeCreatorAssignment(graphId, job, "job", userMap, result, syncId); mergeCreatorAssignment(graphId, job, "job", userMap, result, syncId);
} }
@@ -692,11 +797,23 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeTriggersRelations(String graphId, String syncId) { public SyncResult mergeTriggersRelations(String graphId, String syncId) {
return mergeTriggersRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeTriggersRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("TRIGGERS", syncId); SyncResult result = beginResult("TRIGGERS", syncId);
Map<String, String> workflowMap = buildSourceIdToEntityIdMap(graphId, "Workflow"); Map<String, String> workflowMap = buildSourceIdToEntityIdMap(graphId, "Workflow");
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
try { try {
Object workflowId = job.getProperties().get("workflow_id"); Object workflowId = job.getProperties().get("workflow_id");
if (workflowId == null || workflowId.toString().isBlank()) { if (workflowId == null || workflowId.toString().isBlank()) {
@@ -724,11 +841,23 @@ public class GraphSyncStepService {
*/ */
@Transactional @Transactional
public SyncResult mergeDependsOnRelations(String graphId, String syncId) { public SyncResult mergeDependsOnRelations(String graphId, String syncId) {
return mergeDependsOnRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeDependsOnRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("DEPENDS_ON", syncId); SyncResult result = beginResult("DEPENDS_ON", syncId);
Map<String, String> jobMap = buildSourceIdToEntityIdMap(graphId, "Job"); Map<String, String> jobMap = buildSourceIdToEntityIdMap(graphId, "Job");
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) { List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
if (changedEntityIds != null) {
jobs = jobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : jobs) {
try { try {
Object depJobId = job.getProperties().get("depends_on_job_id"); Object depJobId = job.getProperties().get("depends_on_job_id");
if (depJobId == null || depJobId.toString().isBlank()) { if (depJobId == null || depJobId.toString().isBlank()) {
@@ -751,29 +880,159 @@ public class GraphSyncStepService {
} }
/** /**
* 构建 IMPACTS 关系:Field → Field。 * 构建 IMPACTS 关系:Field → Field(字段级血缘)
* <p> * <p>
* TODO: 字段影响关系来源于 LLM 抽取或规则引擎,而非简单外键关联。 * 通过两种途径推导字段间的影响关系:
* 当前 MVP 阶段为占位实现,后续由抽取模块填充。 * <ol>
* <li>DERIVED_FROM:若 Dataset B 派生自 Dataset A(parent_dataset_id),
* 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT)。</li>
* <li>Job 输入/输出:若 Job 使用 Dataset A 并产出 Dataset B,
* 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT, job_id=源 ID)。</li>
* </ol>
*/ */
@Transactional @Transactional
public SyncResult mergeImpactsRelations(String graphId, String syncId) { public SyncResult mergeImpactsRelations(String graphId, String syncId) {
return mergeImpactsRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeImpactsRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("IMPACTS", syncId); SyncResult result = beginResult("IMPACTS", syncId);
result.setPlaceholder(true);
log.debug("[{}] IMPACTS relations require extraction data, skipping in sync phase", syncId); // 1. 加载所有 Field,按 dataset_source_id 分组
List<GraphEntity> allFields = entityRepository.findByGraphIdAndType(graphId, "Field");
Map<String, List<GraphEntity>> fieldsByDataset = allFields.stream()
.filter(f -> f.getProperties().get("dataset_source_id") != null)
.collect(Collectors.groupingBy(
f -> f.getProperties().get("dataset_source_id").toString()));
if (fieldsByDataset.isEmpty()) {
log.debug("[{}] No fields with dataset_source_id found, skipping IMPACTS", syncId);
return endResult(result);
}
// 记录已处理的 (sourceDatasetId, targetDatasetId) 对,避免重复
Set<String> processedPairs = new HashSet<>();
// 2. DERIVED_FROM 推导:parent dataset fields → child dataset fields
List<GraphEntity> allDatasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
// 增量同步时只处理变更的数据集
if (changedEntityIds != null) {
allDatasets = allDatasets.stream()
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
.toList();
}
for (GraphEntity dataset : allDatasets) {
Object parentId = dataset.getProperties().get("parent_dataset_id");
if (parentId == null || parentId.toString().isBlank()) {
continue;
}
String pairKey = parentId + "" + dataset.getSourceId();
processedPairs.add(pairKey);
mergeFieldImpacts(graphId, parentId.toString(), dataset.getSourceId(),
fieldsByDataset, null, result, syncId);
}
// 3. Job 输入/输出推导:input dataset fields → output dataset fields
List<GraphEntity> allJobs = entityRepository.findByGraphIdAndType(graphId, "Job");
// 增量同步时只处理变更的作业
if (changedEntityIds != null) {
allJobs = allJobs.stream()
.filter(job -> changedEntityIds.contains(job.getId()))
.toList();
}
for (GraphEntity job : allJobs) {
Object inputDsId = job.getProperties().get("input_dataset_id");
Object outputDsId = job.getProperties().get("output_dataset_id");
if (inputDsId == null || outputDsId == null
|| inputDsId.toString().isBlank() || outputDsId.toString().isBlank()) {
continue;
}
String pairKey = inputDsId + "" + outputDsId;
if (processedPairs.contains(pairKey)) {
continue;
}
processedPairs.add(pairKey);
mergeFieldImpacts(graphId, inputDsId.toString(), outputDsId.toString(),
fieldsByDataset, job.getSourceId(), result, syncId);
}
return endResult(result); return endResult(result);
} }
/**
* 对两个关联 Dataset 的字段按名称匹配,创建 IMPACTS 关系。
*/
private void mergeFieldImpacts(String graphId,
String sourceDatasetSourceId, String targetDatasetSourceId,
Map<String, List<GraphEntity>> fieldsByDataset,
String jobSourceId,
SyncResult result, String syncId) {
List<GraphEntity> sourceFields = fieldsByDataset.getOrDefault(sourceDatasetSourceId, List.of());
List<GraphEntity> targetFields = fieldsByDataset.getOrDefault(targetDatasetSourceId, List.of());
if (sourceFields.isEmpty() || targetFields.isEmpty()) {
return;
}
// 目标字段按名称索引
Map<String, GraphEntity> targetByName = targetFields.stream()
.filter(f -> f.getName() != null && !f.getName().isBlank())
.collect(Collectors.toMap(GraphEntity::getName, f -> f, (a, b) -> a));
for (GraphEntity srcField : sourceFields) {
if (srcField.getName() == null || srcField.getName().isBlank()) {
continue;
}
GraphEntity tgtField = targetByName.get(srcField.getName());
if (tgtField == null) {
continue;
}
try {
String propsJson = jobSourceId != null
? "{\"impact_type\":\"DIRECT\",\"job_id\":\"" + sanitizePropertyValue(jobSourceId) + "\"}"
: "{\"impact_type\":\"DIRECT\"}";
boolean created = mergeRelation(graphId, srcField.getId(), tgtField.getId(),
"IMPACTS", propsJson, syncId);
if (created) {
result.incrementCreated();
} else {
result.incrementSkipped();
}
} catch (Exception e) {
log.warn("[{}] Failed to merge IMPACTS: {} → {}", syncId,
srcField.getId(), tgtField.getId(), e);
result.addError("impacts:" + srcField.getId());
}
}
}
/** /**
* 构建 SOURCED_FROM 关系:KnowledgeSet → Dataset(通过 source_dataset_ids)。 * 构建 SOURCED_FROM 关系:KnowledgeSet → Dataset(通过 source_dataset_ids)。
*/ */
@Transactional @Transactional
public SyncResult mergeSourcedFromRelations(String graphId, String syncId) { public SyncResult mergeSourcedFromRelations(String graphId, String syncId) {
return mergeSourcedFromRelations(graphId, syncId, null);
}
@Transactional
public SyncResult mergeSourcedFromRelations(String graphId, String syncId, Set<String> changedEntityIds) {
SyncResult result = beginResult("SOURCED_FROM", syncId); SyncResult result = beginResult("SOURCED_FROM", syncId);
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset"); Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
for (GraphEntity ks : entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet")) { List<GraphEntity> knowledgeSets = entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet");
if (changedEntityIds != null) {
knowledgeSets = knowledgeSets.stream()
.filter(ks -> changedEntityIds.contains(ks.getId()))
.toList();
}
for (GraphEntity ks : knowledgeSets) {
try { try {
Object sourceIds = ks.getProperties().get("source_dataset_ids"); Object sourceIds = ks.getProperties().get("source_dataset_ids");
if (sourceIds == null) { if (sourceIds == null) {
@@ -847,7 +1106,8 @@ public class GraphSyncStepService {
"ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " + "ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " +
" e.name = $name, e.description = $description, " + " e.name = $name, e.description = $description, " +
" e.created_at = datetime(), e.updated_at = datetime()" + extraSet + " " + " e.created_at = datetime(), e.updated_at = datetime()" + extraSet + " " +
"ON MATCH SET e.name = $name, e.description = $description, " + "ON MATCH SET e.name = CASE WHEN $name <> '' THEN $name ELSE e.name END, " +
" e.description = CASE WHEN $description <> '' THEN $description ELSE e.description END, " +
" e.updated_at = datetime()" + extraSet + " " + " e.updated_at = datetime()" + extraSet + " " +
"RETURN e.id = $newId AS isNew" "RETURN e.id = $newId AS isNew"
) )
@@ -871,6 +1131,16 @@ public class GraphSyncStepService {
return key.replaceAll("[^a-zA-Z0-9_]", ""); return key.replaceAll("[^a-zA-Z0-9_]", "");
} }
/**
* 清理属性值用于 JSON 字符串拼接,转义双引号和反斜杠,防止 JSON 注入。
*/
private static String sanitizePropertyValue(String value) {
if (value == null) {
return "";
}
return value.replace("\\", "\\\\").replace("\"", "\\\"");
}
/** /**
* 将 Java 值转换为 Neo4j 兼容的属性值。 * 将 Java 值转换为 Neo4j 兼容的属性值。
* <p> * <p>
@@ -925,6 +1195,7 @@ public class GraphSyncStepService {
"MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " + "MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " +
"ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " + "ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " +
" r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " + " r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " +
"ON MATCH SET r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " +
"RETURN r.id AS relId" "RETURN r.id AS relId"
) )
.bindAll(Map.of( .bindAll(Map.of(

View File

@@ -0,0 +1,194 @@
package com.datamate.knowledgegraph.domain.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Transient;
import org.springframework.data.neo4j.core.schema.GeneratedValue;
import org.springframework.data.neo4j.core.schema.Id;
import org.springframework.data.neo4j.core.schema.Node;
import org.springframework.data.neo4j.core.schema.Property;
import org.springframework.data.neo4j.core.support.UUIDStringGenerator;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 同步操作元数据,用于记录每次同步的整体状态和统计信息。
* <p>
* 同时作为 Neo4j 节点持久化到图数据库,支持历史查询和问题排查。
*/
@Node("SyncHistory")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SyncMetadata {
public static final String STATUS_SUCCESS = "SUCCESS";
public static final String STATUS_FAILED = "FAILED";
public static final String STATUS_PARTIAL = "PARTIAL";
public static final String TYPE_FULL = "FULL";
public static final String TYPE_INCREMENTAL = "INCREMENTAL";
public static final String TYPE_DATASETS = "DATASETS";
public static final String TYPE_FIELDS = "FIELDS";
public static final String TYPE_USERS = "USERS";
public static final String TYPE_ORGS = "ORGS";
public static final String TYPE_WORKFLOWS = "WORKFLOWS";
public static final String TYPE_JOBS = "JOBS";
public static final String TYPE_LABEL_TASKS = "LABEL_TASKS";
public static final String TYPE_KNOWLEDGE_SETS = "KNOWLEDGE_SETS";
@Id
@GeneratedValue(UUIDStringGenerator.class)
private String id;
@Property("sync_id")
private String syncId;
@Property("graph_id")
private String graphId;
/** 同步类型:FULL / DATASETS / WORKFLOWS 等 */
@Property("sync_type")
private String syncType;
/** 同步状态:SUCCESS / FAILED / PARTIAL */
@Property("status")
private String status;
@Property("started_at")
private LocalDateTime startedAt;
@Property("completed_at")
private LocalDateTime completedAt;
@Property("duration_millis")
private long durationMillis;
@Property("total_created")
@Builder.Default
private int totalCreated = 0;
@Property("total_updated")
@Builder.Default
private int totalUpdated = 0;
@Property("total_skipped")
@Builder.Default
private int totalSkipped = 0;
@Property("total_failed")
@Builder.Default
private int totalFailed = 0;
@Property("total_purged")
@Builder.Default
private int totalPurged = 0;
/** 增量同步的时间窗口起始 */
@Property("updated_from")
private LocalDateTime updatedFrom;
/** 增量同步的时间窗口结束 */
@Property("updated_to")
private LocalDateTime updatedTo;
/** 同步失败时的错误信息 */
@Property("error_message")
private String errorMessage;
/** 各步骤的摘要,如 "Dataset(+5/~2/-0/purged:1)" */
@Property("step_summaries")
@Builder.Default
private List<String> stepSummaries = new ArrayList<>();
/** 详细的各步骤结果(不持久化到 Neo4j,仅在返回时携带) */
@Transient
private List<SyncResult> results;
public int totalEntities() {
return totalCreated + totalUpdated + totalSkipped + totalFailed;
}
/**
* 从 SyncResult 列表构建元数据。
*/
public static SyncMetadata fromResults(String syncId, String graphId, String syncType,
LocalDateTime startedAt, List<SyncResult> results) {
LocalDateTime completedAt = LocalDateTime.now();
long duration = Duration.between(startedAt, completedAt).toMillis();
int created = 0, updated = 0, skipped = 0, failed = 0, purged = 0;
List<String> summaries = new ArrayList<>();
boolean hasFailures = false;
for (SyncResult r : results) {
created += r.getCreated();
updated += r.getUpdated();
skipped += r.getSkipped();
failed += r.getFailed();
purged += r.getPurged();
if (r.getFailed() > 0) {
hasFailures = true;
}
summaries.add(formatStepSummary(r));
}
String status = hasFailures ? STATUS_PARTIAL : STATUS_SUCCESS;
return SyncMetadata.builder()
.syncId(syncId)
.graphId(graphId)
.syncType(syncType)
.status(status)
.startedAt(startedAt)
.completedAt(completedAt)
.durationMillis(duration)
.totalCreated(created)
.totalUpdated(updated)
.totalSkipped(skipped)
.totalFailed(failed)
.totalPurged(purged)
.stepSummaries(summaries)
.results(results)
.build();
}
/**
* 构建失败的元数据。
*/
public static SyncMetadata failed(String syncId, String graphId, String syncType,
LocalDateTime startedAt, String errorMessage) {
LocalDateTime completedAt = LocalDateTime.now();
long duration = Duration.between(startedAt, completedAt).toMillis();
return SyncMetadata.builder()
.syncId(syncId)
.graphId(graphId)
.syncType(syncType)
.status(STATUS_FAILED)
.startedAt(startedAt)
.completedAt(completedAt)
.durationMillis(duration)
.errorMessage(errorMessage)
.build();
}
private static String formatStepSummary(SyncResult r) {
StringBuilder sb = new StringBuilder();
sb.append(r.getSyncType())
.append("(+").append(r.getCreated())
.append("/~").append(r.getUpdated())
.append("/-").append(r.getFailed());
if (r.getPurged() > 0) {
sb.append("/purged:").append(r.getPurged());
}
sb.append(")");
return sb.toString();
}
}

View File

@@ -345,16 +345,13 @@ public class GraphRelationRepository {
.query( .query(
"MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " + "MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " +
"MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " + "MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " +
"CREATE (s)-[r:" + REL_TYPE + " {" + "MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " +
" id: $id," + "ON CREATE SET r.id = $id, r.weight = $weight, r.confidence = $confidence, " +
" relation_type: $relationType," + " r.source_id = $sourceId, r.properties_json = $propertiesJson, r.created_at = $createdAt " +
" weight: $weight," + "ON MATCH SET r.weight = CASE WHEN $weight IS NOT NULL THEN $weight ELSE r.weight END, " +
" confidence: $confidence," + " r.confidence = CASE WHEN $confidence IS NOT NULL THEN $confidence ELSE r.confidence END, " +
" source_id: $sourceId," + " r.source_id = CASE WHEN $sourceId <> '' THEN $sourceId ELSE r.source_id END, " +
" graph_id: $graphId," + " r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " +
" properties_json: $propertiesJson," +
" created_at: $createdAt" +
"}]->(t) " +
RETURN_COLUMNS RETURN_COLUMNS
) )
.bindAll(params) .bindAll(params)

View File

@@ -0,0 +1,43 @@
package com.datamate.knowledgegraph.domain.repository;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import org.springframework.data.neo4j.repository.Neo4jRepository;
import org.springframework.data.neo4j.repository.query.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
@Repository
public interface SyncHistoryRepository extends Neo4jRepository<SyncMetadata, String> {
@Query("MATCH (h:SyncHistory {graph_id: $graphId}) " +
"RETURN h ORDER BY h.started_at DESC LIMIT $limit")
List<SyncMetadata> findByGraphId(
@Param("graphId") String graphId,
@Param("limit") int limit);
@Query("MATCH (h:SyncHistory {graph_id: $graphId, status: $status}) " +
"RETURN h ORDER BY h.started_at DESC LIMIT $limit")
List<SyncMetadata> findByGraphIdAndStatus(
@Param("graphId") String graphId,
@Param("status") String status,
@Param("limit") int limit);
@Query("MATCH (h:SyncHistory {graph_id: $graphId, sync_id: $syncId}) RETURN h")
Optional<SyncMetadata> findByGraphIdAndSyncId(
@Param("graphId") String graphId,
@Param("syncId") String syncId);
@Query("MATCH (h:SyncHistory {graph_id: $graphId}) " +
"WHERE h.started_at >= $from AND h.started_at <= $to " +
"RETURN h ORDER BY h.started_at DESC SKIP $skip LIMIT $limit")
List<SyncMetadata> findByGraphIdAndTimeRange(
@Param("graphId") String graphId,
@Param("from") LocalDateTime from,
@Param("to") LocalDateTime to,
@Param("skip") long skip,
@Param("limit") int limit);
}

View File

@@ -22,7 +22,8 @@ public enum KnowledgeGraphErrorCode implements ErrorCode {
SYNC_FAILED("knowledge_graph.0009", "数据同步失败"), SYNC_FAILED("knowledge_graph.0009", "数据同步失败"),
EMPTY_SNAPSHOT_PURGE_BLOCKED("knowledge_graph.0010", "空快照保护:上游返回空列表,已阻止 purge 操作"), EMPTY_SNAPSHOT_PURGE_BLOCKED("knowledge_graph.0010", "空快照保护:上游返回空列表,已阻止 purge 操作"),
SCHEMA_INIT_FAILED("knowledge_graph.0011", "图谱 Schema 初始化失败"), SCHEMA_INIT_FAILED("knowledge_graph.0011", "图谱 Schema 初始化失败"),
INSECURE_DEFAULT_CREDENTIALS("knowledge_graph.0012", "检测到默认凭据,生产环境禁止使用默认密码"); INSECURE_DEFAULT_CREDENTIALS("knowledge_graph.0012", "检测到默认凭据,生产环境禁止使用默认密码"),
UNAUTHORIZED_INTERNAL_CALL("knowledge_graph.0013", "内部调用未授权:X-Internal-Token 校验失败");
private final String code; private final String code;
private final String message; private final String message;

View File

@@ -72,7 +72,20 @@ public class GraphInitializer implements ApplicationRunner {
"CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)", "CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)",
// 全文索引 // 全文索引
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]" "CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]",
// ── SyncHistory 约束和索引 ──
// P1: syncId 唯一约束,防止 ID 碰撞
"CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " +
"FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE",
// P2-3: 查询优化索引
"CREATE INDEX sync_history_graph_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.started_at)",
"CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)"
); );
@Override @Override

View File

@@ -25,6 +25,24 @@ public class KnowledgeGraphProperties {
/** 同步相关配置 */ /** 同步相关配置 */
private Sync sync = new Sync(); private Sync sync = new Sync();
/** 安全相关配置 */
private Security security = new Security();
@Data
public static class Security {
/** 内部服务调用 Token,用于校验 sync 端点的 X-Internal-Token 请求头 */
private String internalToken;
/**
* 是否跳过内部 Token 校验(默认 false,即 fail-closed)。
* <p>
* 仅允许在 dev/test 环境显式设置为 true 以跳过校验。
* 生产环境必须保持 false 并配置 {@code internal-token}。
*/
private boolean skipTokenCheck = false;
}
@Data @Data
public static class Sync { public static class Sync {

View File

@@ -0,0 +1,74 @@
package com.datamate.knowledgegraph.infrastructure.security;
import com.datamate.common.infrastructure.common.Response;
import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.servlet.HandlerInterceptor;
import java.io.IOException;
/**
* 内部服务调用 Token 校验拦截器。
* <p>
* 验证 {@code X-Internal-Token} 请求头,保护 sync 端点仅供内部服务/定时任务调用。
* <p>
* <strong>安全策略(fail-closed)</strong>:
* <ul>
* <li>Token 未配置且 {@code skip-token-check=false}(默认)时,直接拒绝请求</li>
* <li>仅当 dev/test 环境显式设置 {@code skip-token-check=true} 时,才跳过校验</li>
* </ul>
*/
@Component
@RequiredArgsConstructor
public class InternalTokenInterceptor implements HandlerInterceptor {
private static final Logger log = LoggerFactory.getLogger(InternalTokenInterceptor.class);
private static final String HEADER_INTERNAL_TOKEN = "X-Internal-Token";
private final KnowledgeGraphProperties properties;
private final ObjectMapper objectMapper;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws IOException {
KnowledgeGraphProperties.Security security = properties.getSecurity();
String configuredToken = security.getInternalToken();
if (!StringUtils.hasText(configuredToken)) {
if (security.isSkipTokenCheck()) {
log.warn("内部调用 Token 未配置且 skip-token-check=true,跳过校验(仅限 dev/test 环境)。");
return true;
}
log.error("内部调用 Token 未配置且 skip-token-check=false(fail-closed),拒绝请求。"
+ "请设置 KG_INTERNAL_TOKEN 环境变量或在 dev/test 环境启用 skip-token-check。");
writeErrorResponse(response);
return false;
}
String requestToken = request.getHeader(HEADER_INTERNAL_TOKEN);
if (!configuredToken.equals(requestToken)) {
writeErrorResponse(response);
return false;
}
return true;
}
private void writeErrorResponse(HttpServletResponse response) throws IOException {
Response<?> errorBody = Response.error(KnowledgeGraphErrorCode.UNAUTHORIZED_INTERNAL_CALL);
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.setCharacterEncoding("UTF-8");
response.getWriter().write(objectMapper.writeValueAsString(errorBody));
}
}

View File

@@ -0,0 +1,22 @@
package com.datamate.knowledgegraph.infrastructure.security;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* 注册 {@link InternalTokenInterceptor},仅拦截 sync 端点。
*/
@Configuration
@RequiredArgsConstructor
public class InternalTokenWebMvcConfigurer implements WebMvcConfigurer {
private final InternalTokenInterceptor internalTokenInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(internalTokenInterceptor)
.addPathPatterns("/knowledge-graph/*/sync/**");
}
}

View File

@@ -0,0 +1,75 @@
package com.datamate.knowledgegraph.interfaces.dto;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.List;
/**
* 同步元数据视图对象。
* <p>
* 包含本次同步的整体统计信息和各步骤的详细结果。
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SyncMetadataVO {
private String syncId;
private String graphId;
private String syncType;
private String status;
private LocalDateTime startedAt;
private LocalDateTime completedAt;
private long durationMillis;
private int totalCreated;
private int totalUpdated;
private int totalSkipped;
private int totalFailed;
private int totalPurged;
private int totalEntities;
private LocalDateTime updatedFrom;
private LocalDateTime updatedTo;
private String errorMessage;
private List<String> stepSummaries;
/** 各步骤的详细结果(仅当前同步返回时携带,历史查询时为 null) */
private List<SyncResultVO> results;
/**
* 从 SyncMetadata 转换(包含详细步骤结果)。
*/
public static SyncMetadataVO from(SyncMetadata metadata) {
List<SyncResultVO> resultVOs = null;
if (metadata.getResults() != null) {
resultVOs = metadata.getResults().stream()
.map(SyncResultVO::from)
.toList();
}
return SyncMetadataVO.builder()
.syncId(metadata.getSyncId())
.graphId(metadata.getGraphId())
.syncType(metadata.getSyncType())
.status(metadata.getStatus())
.startedAt(metadata.getStartedAt())
.completedAt(metadata.getCompletedAt())
.durationMillis(metadata.getDurationMillis())
.totalCreated(metadata.getTotalCreated())
.totalUpdated(metadata.getTotalUpdated())
.totalSkipped(metadata.getTotalSkipped())
.totalFailed(metadata.getTotalFailed())
.totalPurged(metadata.getTotalPurged())
.totalEntities(metadata.totalEntities())
.updatedFrom(metadata.getUpdatedFrom())
.updatedTo(metadata.getUpdatedTo())
.errorMessage(metadata.getErrorMessage())
.stepSummaries(metadata.getStepSummaries())
.results(resultVOs)
.build();
}
}

View File

@@ -1,13 +1,20 @@
package com.datamate.knowledgegraph.interfaces.rest; package com.datamate.knowledgegraph.interfaces.rest;
import com.datamate.knowledgegraph.application.GraphSyncService; import com.datamate.knowledgegraph.application.GraphSyncService;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import com.datamate.knowledgegraph.domain.model.SyncResult; import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.interfaces.dto.SyncMetadataVO;
import com.datamate.knowledgegraph.interfaces.dto.SyncResultVO; import com.datamate.knowledgegraph.interfaces.dto.SyncResultVO;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Pattern; import jakarta.validation.constraints.Pattern;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
/** /**
@@ -16,10 +23,13 @@ import java.util.List;
* 提供手动触发 MySQL → Neo4j 同步的 REST 端点。 * 提供手动触发 MySQL → Neo4j 同步的 REST 端点。
* 生产环境中也可通过定时任务自动触发。 * 生产环境中也可通过定时任务自动触发。
* <p> * <p>
* <b>安全说明</b>:本接口仅供内部服务调用(API Gateway / 定时任务), * <b>安全架构</b>:
* 外部请求必须经 API Gateway 鉴权后转发。 * <ul>
* 生产环境建议通过 mTLS 或内部 JWT 进一步加固服务间认证。 * <li>外部请求 → API Gateway (JWT 校验) → X-User-* headers → 后端服务</li>
* 当前通过 {@code X-Internal-Token} 请求头进行简单的内部调用校验。 * <li>内部调用 → X-Internal-Token header → {@code InternalTokenInterceptor} 校验 → sync 端点</li>
* </ul>
* Token 校验由 {@code InternalTokenInterceptor} 拦截器统一实现,
* 对 {@code /knowledge-graph/{graphId}/sync/} 路径前缀自动生效。
*/ */
@RestController @RestController
@RequestMapping("/knowledge-graph/{graphId}/sync") @RequestMapping("/knowledge-graph/{graphId}/sync")
@@ -36,10 +46,22 @@ public class GraphSyncController {
* 全量同步:拉取所有实体并构建关系。 * 全量同步:拉取所有实体并构建关系。
*/ */
@PostMapping("/full") @PostMapping("/full")
public List<SyncResultVO> syncAll( public SyncMetadataVO syncAll(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
List<SyncResult> results = syncService.syncAll(graphId); SyncMetadata metadata = syncService.syncAll(graphId);
return results.stream().map(SyncResultVO::from).toList(); return SyncMetadataVO.from(metadata);
}
/**
* 增量同步:仅拉取指定时间窗口内变更的数据并同步。
*/
@PostMapping("/incremental")
public SyncMetadataVO syncIncremental(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedFrom,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedTo) {
SyncMetadata metadata = syncService.syncIncremental(graphId, updatedFrom, updatedTo);
return SyncMetadataVO.from(metadata);
} }
/** /**
@@ -211,4 +233,50 @@ public class GraphSyncController {
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.buildSourcedFromRelations(graphId)); return SyncResultVO.from(syncService.buildSourcedFromRelations(graphId));
} }
// -----------------------------------------------------------------------
// 同步历史查询端点
// -----------------------------------------------------------------------
/**
* 查询同步历史记录。
*
* @param status 可选,按状态过滤(SUCCESS / FAILED / PARTIAL)
* @param limit 返回条数上限,默认 20
*/
@GetMapping("/history")
public List<SyncMetadataVO> getSyncHistory(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@RequestParam(required = false) String status,
@RequestParam(defaultValue = "20") @Min(1) @Max(200) int limit) {
List<SyncMetadata> history = syncService.getSyncHistory(graphId, status, limit);
return history.stream().map(SyncMetadataVO::from).toList();
}
/**
* 按时间范围查询同步历史。
*/
@GetMapping("/history/range")
public List<SyncMetadataVO> getSyncHistoryByTimeRange(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime from,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime to,
@RequestParam(defaultValue = "0") @Min(0) @Max(10000) int page,
@RequestParam(defaultValue = "20") @Min(1) @Max(200) int size) {
List<SyncMetadata> history = syncService.getSyncHistoryByTimeRange(graphId, from, to, page, size);
return history.stream().map(SyncMetadataVO::from).toList();
}
/**
* 根据 syncId 查询单条同步记录。
*/
@GetMapping("/history/{syncId}")
public ResponseEntity<SyncMetadataVO> getSyncRecord(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@PathVariable String syncId) {
return syncService.getSyncRecord(graphId, syncId)
.map(SyncMetadataVO::from)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
} }

View File

@@ -23,6 +23,14 @@ datamate:
max-nodes-per-query: ${KG_MAX_NODES:500} max-nodes-per-query: ${KG_MAX_NODES:500}
# 批量导入批次大小 # 批量导入批次大小
import-batch-size: ${KG_IMPORT_BATCH_SIZE:100} import-batch-size: ${KG_IMPORT_BATCH_SIZE:100}
# 安全配置
security:
# 内部服务调用 Token(用于 sync 端点的 X-Internal-Token 校验)
# 生产环境务必通过 KG_INTERNAL_TOKEN 环境变量设置,否则 sync 端点将拒绝所有请求(fail-closed)
internal-token: ${KG_INTERNAL_TOKEN:}
# 是否跳过 Token 校验(默认 false = fail-closed)
# 仅在 dev/test 环境显式设置为 true 以跳过校验
skip-token-check: ${KG_SKIP_TOKEN_CHECK:false}
# MySQL → Neo4j 同步配置 # MySQL → Neo4j 同步配置
sync: sync:
# 数据管理服务地址 # 数据管理服务地址

View File

@@ -1,7 +1,9 @@
package com.datamate.knowledgegraph.application; package com.datamate.knowledgegraph.application;
import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import com.datamate.knowledgegraph.domain.model.SyncResult; import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
@@ -13,12 +15,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -42,6 +47,9 @@ class GraphSyncServiceTest {
@Mock @Mock
private KnowledgeGraphProperties properties; private KnowledgeGraphProperties properties;
@Mock
private SyncHistoryRepository syncHistoryRepository;
@InjectMocks @InjectMocks
private GraphSyncService syncService; private GraphSyncService syncService;
@@ -161,7 +169,7 @@ class GraphSyncServiceTest {
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString())) when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
List<SyncResult> results = syncService.syncAll(GRAPH_ID); List<SyncResult> results = syncService.syncAll(GRAPH_ID).getResults();
// 8 entities + 10 relations = 18 // 8 entities + 10 relations = 18
assertThat(results).hasSize(18); assertThat(results).hasSize(18);
@@ -335,4 +343,481 @@ class GraphSyncServiceTest {
.isInstanceOf(BusinessException.class); .isInstanceOf(BusinessException.class);
} }
} }
// -----------------------------------------------------------------------
// 同步元数据记录
// -----------------------------------------------------------------------
@Nested
class SyncMetadataRecordingTest {
@Test
void syncAll_success_recordsMetadataWithCorrectFields() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
dto.setCreatedBy("admin");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").created(3).updated(1).build());
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Field").build());
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
.thenReturn(SyncResult.builder().syncType("User").build());
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("Org").build());
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
.thenReturn(0);
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
SyncMetadata metadata = syncService.syncAll(GRAPH_ID);
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL);
assertThat(metadata.getGraphId()).isEqualTo(GRAPH_ID);
assertThat(metadata.getSyncId()).isNotNull();
assertThat(metadata.getStartedAt()).isNotNull();
assertThat(metadata.getCompletedAt()).isNotNull();
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
assertThat(metadata.getTotalCreated()).isEqualTo(3);
assertThat(metadata.getTotalUpdated()).isEqualTo(1);
assertThat(metadata.getResults()).hasSize(18);
assertThat(metadata.getStepSummaries()).hasSize(18);
assertThat(metadata.getErrorMessage()).isNull();
// 验证持久化被调用
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID);
}
@Test
void syncAll_withFailedSteps_recordsPartialStatus() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
dto.setCreatedBy("admin");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
// Dataset step has failures
SyncResult datasetResult = SyncResult.builder().syncType("Dataset").created(2).failed(1).build();
datasetResult.setErrors(new java.util.ArrayList<>(List.of("some error")));
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(datasetResult);
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Field").build());
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
.thenReturn(SyncResult.builder().syncType("User").build());
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("Org").build());
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
.thenReturn(0);
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
SyncMetadata metadata = syncService.syncAll(GRAPH_ID);
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL);
assertThat(metadata.getTotalFailed()).isEqualTo(1);
assertThat(metadata.getTotalCreated()).isEqualTo(2);
}
@Test
void syncAll_exceptionThrown_recordsFailedMetadata() {
when(properties.getSync()).thenReturn(syncConfig);
when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("connection refused"));
assertThatThrownBy(() -> syncService.syncAll(GRAPH_ID))
.isInstanceOf(BusinessException.class);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(saved.getErrorMessage()).isNotNull();
assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL);
}
@Test
void syncDatasets_success_recordsMetadata() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").created(1).build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString()))
.thenReturn(0);
syncService.syncDatasets(GRAPH_ID);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS);
assertThat(saved.getTotalCreated()).isEqualTo(1);
}
@Test
void syncDatasets_failed_recordsFailedMetadata() {
when(properties.getSync()).thenReturn(syncConfig);
when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("timeout"));
assertThatThrownBy(() -> syncService.syncDatasets(GRAPH_ID))
.isInstanceOf(BusinessException.class);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS);
}
@Test
void saveSyncHistory_exceptionInSave_doesNotAffectMainFlow() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString()))
.thenReturn(0);
// saveSyncHistory 内部异常不应影响主流程
when(syncHistoryRepository.save(any())).thenThrow(new RuntimeException("Neo4j down"));
SyncResult result = syncService.syncDatasets(GRAPH_ID);
assertThat(result.getSyncType()).isEqualTo("Dataset");
}
}
// -----------------------------------------------------------------------
// 增量同步
// -----------------------------------------------------------------------
@Nested
class IncrementalSyncTest {
private final LocalDateTime UPDATED_FROM = LocalDateTime.of(2025, 6, 1, 0, 0);
private final LocalDateTime UPDATED_TO = LocalDateTime.of(2025, 6, 30, 23, 59);
@Test
void syncIncremental_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(INVALID_GRAPH_ID, UPDATED_FROM, UPDATED_TO))
.isInstanceOf(BusinessException.class);
}
@Test
void syncIncremental_nullUpdatedFrom_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, null, UPDATED_TO))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("updatedFrom");
}
@Test
void syncIncremental_nullUpdatedTo_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, null))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("updatedTo");
}
@Test
void syncIncremental_fromAfterTo_throwsBusinessException() {
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_TO, UPDATED_FROM))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("updatedFrom");
}
@Test
void syncIncremental_success_passesTimeWindowToClient() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
dto.setCreatedBy("admin");
when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of(dto));
when(dataManagementClient.listAllWorkflows(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
when(dataManagementClient.listAllJobs(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
when(dataManagementClient.listAllLabelTasks(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
when(dataManagementClient.listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
stubAllEntityUpserts();
stubAllRelationMerges();
SyncMetadata metadata = syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO);
assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL);
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(metadata.getUpdatedFrom()).isEqualTo(UPDATED_FROM);
assertThat(metadata.getUpdatedTo()).isEqualTo(UPDATED_TO);
assertThat(metadata.getResults()).hasSize(18);
// 验证使用了带时间窗口的 client 方法
verify(dataManagementClient).listAllDatasets(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllWorkflows(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllJobs(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllLabelTasks(UPDATED_FROM, UPDATED_TO);
verify(dataManagementClient).listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO);
// 验证不执行 purge
verify(stepService, never()).purgeStaleEntities(anyString(), anyString(), anySet(), anyString());
}
@Test
void syncIncremental_failure_recordsMetadataWithTimeWindow() {
when(properties.getSync()).thenReturn(syncConfig);
when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO))
.thenThrow(new RuntimeException("connection refused"));
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO))
.isInstanceOf(BusinessException.class);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL);
assertThat(saved.getUpdatedFrom()).isEqualTo(UPDATED_FROM);
assertThat(saved.getUpdatedTo()).isEqualTo(UPDATED_TO);
}
private void stubAllEntityUpserts() {
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").build());
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Field").build());
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
.thenReturn(SyncResult.builder().syncType("User").build());
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("Org").build());
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
}
private void stubAllRelationMerges() {
// 2-参数版本(全量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误
lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
// 3-参数版本(增量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误
lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString(), any()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
}
}
// -----------------------------------------------------------------------
// 同步历史查询
// -----------------------------------------------------------------------
@Nested
class SyncHistoryQueryTest {
@Test
void getSyncHistory_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.getSyncHistory(INVALID_GRAPH_ID, null, 20))
.isInstanceOf(BusinessException.class);
}
@Test
void getSyncHistory_noStatusFilter_callsFindByGraphId() {
when(syncHistoryRepository.findByGraphId(GRAPH_ID, 20)).thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistory(GRAPH_ID, null, 20);
assertThat(result).isEmpty();
verify(syncHistoryRepository).findByGraphId(GRAPH_ID, 20);
verify(syncHistoryRepository, never()).findByGraphIdAndStatus(anyString(), anyString(), anyInt());
}
@Test
void getSyncHistory_withStatusFilter_callsFindByGraphIdAndStatus() {
when(syncHistoryRepository.findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10))
.thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistory(GRAPH_ID, "SUCCESS", 10);
assertThat(result).isEmpty();
verify(syncHistoryRepository).findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10);
}
@Test
void getSyncRecord_found_returnsRecord() {
SyncMetadata expected = SyncMetadata.builder()
.syncId("abc12345").graphId(GRAPH_ID).build();
when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "abc12345"))
.thenReturn(Optional.of(expected));
Optional<SyncMetadata> result = syncService.getSyncRecord(GRAPH_ID, "abc12345");
assertThat(result).isPresent();
assertThat(result.get().getSyncId()).isEqualTo("abc12345");
}
@Test
void getSyncRecord_notFound_returnsEmpty() {
when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "notexist"))
.thenReturn(Optional.empty());
Optional<SyncMetadata> result = syncService.getSyncRecord(GRAPH_ID, "notexist");
assertThat(result).isEmpty();
}
@Test
void getSyncHistoryByTimeRange_delegatesToRepository() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59);
when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20))
.thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 0, 20);
assertThat(result).isEmpty();
verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20);
}
@Test
void getSyncHistoryByTimeRange_pagination_computesSkipCorrectly() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59);
when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20))
.thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 2, 20);
assertThat(result).isEmpty();
// page=2, size=20 → skip=40
verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20);
}
@Test
void getSyncHistoryByTimeRange_skipOverflow_throwsBusinessException() {
// 模拟绕过 Controller 校验直接调用 Service 的场景
assertThatThrownBy(() -> syncService.getSyncHistoryByTimeRange(
GRAPH_ID,
LocalDateTime.of(2025, 1, 1, 0, 0),
LocalDateTime.of(2025, 12, 31, 23, 59),
20000, 200))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("分页偏移量");
}
}
} }

View File

@@ -749,14 +749,129 @@ class GraphSyncStepServiceTest {
} }
@Test @Test
void mergeImpacts_returnsPlaceholderResult() { void mergeImpacts_noFields_returnsZero() {
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")).thenReturn(List.of());
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID); SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("IMPACTS"); assertThat(result.getSyncType()).isEqualTo("IMPACTS");
assertThat(result.getCreated()).isEqualTo(0); assertThat(result.getCreated()).isEqualTo(0);
assertThat(result.isPlaceholder()).isTrue(); assertThat(result.isPlaceholder()).isFalse();
verifyNoInteractions(neo4jClient); verifyNoInteractions(neo4jClient);
verifyNoInteractions(entityRepository); }
@Test
void mergeImpacts_derivedFrom_matchingFieldNames_createsRelation() {
setupNeo4jQueryChain(String.class, "new-rel-id");
// Parent dataset (source_id = "ds-parent")
GraphEntity parentDs = GraphEntity.builder()
.id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
// Child dataset (source_id = "ds-child", parent_dataset_id = "ds-parent")
GraphEntity childDs = GraphEntity.builder()
.id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent")))
.build();
// Fields with matching name "user_id"
GraphEntity parentField = GraphEntity.builder()
.id("field-parent-uid").name("user_id").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent")))
.build();
GraphEntity childField = GraphEntity.builder()
.id("field-child-uid").name("user_id").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-child")))
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
.thenReturn(List.of(parentField, childField));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
.thenReturn(List.of(parentDs, childDs));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
.thenReturn(List.of());
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
}
@Test
void mergeImpacts_noMatchingFieldNames_createsNoRelation() {
GraphEntity parentDs = GraphEntity.builder()
.id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
GraphEntity childDs = GraphEntity.builder()
.id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent")))
.build();
GraphEntity parentField = GraphEntity.builder()
.id("field-parent").name("col_a").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent")))
.build();
GraphEntity childField = GraphEntity.builder()
.id("field-child").name("col_b").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-child")))
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
.thenReturn(List.of(parentField, childField));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
.thenReturn(List.of(parentDs, childDs));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
.thenReturn(List.of());
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getCreated()).isEqualTo(0);
verifyNoInteractions(neo4jClient);
}
@Test
void mergeImpacts_jobInputOutput_createsRelationWithJobId() {
setupNeo4jQueryChain(String.class, "new-rel-id");
GraphEntity inputDs = GraphEntity.builder()
.id("input-entity").sourceId("ds-in").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
GraphEntity outputDs = GraphEntity.builder()
.id("output-entity").sourceId("ds-out").type("Dataset").graphId(GRAPH_ID)
.properties(new HashMap<>())
.build();
GraphEntity job = GraphEntity.builder()
.id("job-entity").sourceId("job-001").type("Job").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of(
"input_dataset_id", "ds-in",
"output_dataset_id", "ds-out")))
.build();
GraphEntity inField = GraphEntity.builder()
.id("field-in").name("tag_x").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-in")))
.build();
GraphEntity outField = GraphEntity.builder()
.id("field-out").name("tag_x").type("Field").graphId(GRAPH_ID)
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-out")))
.build();
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
.thenReturn(List.of(inField, outField));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
.thenReturn(List.of(inputDs, outputDs));
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
.thenReturn(List.of(job));
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
verify(neo4jClient).query(cypherCaptor.capture());
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
} }
@Test @Test

View File

@@ -0,0 +1,96 @@
package com.datamate.knowledgegraph.domain.model;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
class SyncMetadataTest {
@Test
void fromResults_aggregatesCountsCorrectly() {
LocalDateTime startedAt = LocalDateTime.of(2025, 6, 1, 10, 0, 0);
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(1).purged(3).build();
SyncResult r2 = SyncResult.builder().syncType("Field").created(10).updated(0).skipped(2).build();
SyncResult r3 = SyncResult.builder().syncType("HAS_FIELD").created(8).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc123", "graph-id", "FULL", startedAt, List.of(r1, r2, r3));
assertThat(metadata.getSyncId()).isEqualTo("abc123");
assertThat(metadata.getGraphId()).isEqualTo("graph-id");
assertThat(metadata.getSyncType()).isEqualTo("FULL");
assertThat(metadata.getTotalCreated()).isEqualTo(23); // 5 + 10 + 8
assertThat(metadata.getTotalUpdated()).isEqualTo(2); // 2 + 0 + 0
assertThat(metadata.getTotalSkipped()).isEqualTo(2); // 0 + 2 + 0
assertThat(metadata.getTotalFailed()).isEqualTo(1); // 1 + 0 + 0
assertThat(metadata.getTotalPurged()).isEqualTo(3); // 3 + 0 + 0
assertThat(metadata.getStartedAt()).isEqualTo(startedAt);
assertThat(metadata.getCompletedAt()).isNotNull();
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
assertThat(metadata.getResults()).hasSize(3);
assertThat(metadata.getStepSummaries()).hasSize(3);
}
@Test
void fromResults_noFailures_statusIsSuccess() {
LocalDateTime startedAt = LocalDateTime.now();
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1));
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
}
@Test
void fromResults_withFailures_statusIsPartial() {
LocalDateTime startedAt = LocalDateTime.now();
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).failed(2).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1));
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL);
assertThat(metadata.getTotalFailed()).isEqualTo(2);
}
@Test
void failed_createsFailedMetadata() {
LocalDateTime startedAt = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
SyncMetadata metadata = SyncMetadata.failed("abc", "g1", "FULL", startedAt, "connection refused");
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(metadata.getErrorMessage()).isEqualTo("connection refused");
assertThat(metadata.getSyncId()).isEqualTo("abc");
assertThat(metadata.getGraphId()).isEqualTo("g1");
assertThat(metadata.getSyncType()).isEqualTo("FULL");
assertThat(metadata.getStartedAt()).isEqualTo(startedAt);
assertThat(metadata.getCompletedAt()).isNotNull();
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
assertThat(metadata.getTotalCreated()).isEqualTo(0);
assertThat(metadata.getTotalUpdated()).isEqualTo(0);
}
@Test
void totalEntities_returnsSum() {
SyncMetadata metadata = SyncMetadata.builder()
.totalCreated(10).totalUpdated(5).totalSkipped(3).totalFailed(2)
.build();
assertThat(metadata.totalEntities()).isEqualTo(20);
}
@Test
void stepSummaries_formatIncludesPurgedWhenNonZero() {
LocalDateTime startedAt = LocalDateTime.now();
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(0).purged(3).build();
SyncResult r2 = SyncResult.builder().syncType("Field").created(1).updated(0).failed(0).purged(0).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1, r2));
assertThat(metadata.getStepSummaries().get(0)).isEqualTo("Dataset(+5/~2/-0/purged:3)");
assertThat(metadata.getStepSummaries().get(1)).isEqualTo("Field(+1/~0/-0)");
}
}

View File

@@ -0,0 +1,152 @@
package com.datamate.knowledgegraph.infrastructure.security;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.servlet.http.HttpServletResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.mock.web.MockHttpServletResponse;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(MockitoExtension.class)
class InternalTokenInterceptorTest {
private static final String VALID_TOKEN = "test-secret-token";
private KnowledgeGraphProperties properties;
private InternalTokenInterceptor interceptor;
@BeforeEach
void setUp() {
properties = new KnowledgeGraphProperties();
interceptor = new InternalTokenInterceptor(properties, new ObjectMapper());
}
// -----------------------------------------------------------------------
// fail-closed:Token 未配置 + skipTokenCheck=false → 拒绝
// -----------------------------------------------------------------------
@Test
void tokenNotConfigured_skipFalse_rejects() throws Exception {
properties.getSecurity().setInternalToken(null);
properties.getSecurity().setSkipTokenCheck(false);
MockHttpServletRequest request = new MockHttpServletRequest();
MockHttpServletResponse response = new MockHttpServletResponse();
boolean result = interceptor.preHandle(request, response, new Object());
assertThat(result).isFalse();
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
assertThat(response.getContentAsString()).contains("knowledge_graph.0013");
}
@Test
void tokenEmpty_skipFalse_rejects() throws Exception {
properties.getSecurity().setInternalToken("");
properties.getSecurity().setSkipTokenCheck(false);
MockHttpServletRequest request = new MockHttpServletRequest();
MockHttpServletResponse response = new MockHttpServletResponse();
boolean result = interceptor.preHandle(request, response, new Object());
assertThat(result).isFalse();
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
}
// -----------------------------------------------------------------------
// dev/test 放行:Token 未配置 + skipTokenCheck=true → 放行
// -----------------------------------------------------------------------
@Test
void tokenNotConfigured_skipTrue_allows() throws Exception {
properties.getSecurity().setInternalToken(null);
properties.getSecurity().setSkipTokenCheck(true);
MockHttpServletRequest request = new MockHttpServletRequest();
MockHttpServletResponse response = new MockHttpServletResponse();
boolean result = interceptor.preHandle(request, response, new Object());
assertThat(result).isTrue();
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_OK);
}
// -----------------------------------------------------------------------
// 正常校验:Token 已配置 + 请求头匹配 → 放行
// -----------------------------------------------------------------------
@Test
void validToken_allows() throws Exception {
properties.getSecurity().setInternalToken(VALID_TOKEN);
MockHttpServletRequest request = new MockHttpServletRequest();
request.addHeader("X-Internal-Token", VALID_TOKEN);
MockHttpServletResponse response = new MockHttpServletResponse();
boolean result = interceptor.preHandle(request, response, new Object());
assertThat(result).isTrue();
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_OK);
}
// -----------------------------------------------------------------------
// 401:Token 已配置 + 请求头不匹配 → 拒绝
// -----------------------------------------------------------------------
@Test
void invalidToken_rejects() throws Exception {
properties.getSecurity().setInternalToken(VALID_TOKEN);
MockHttpServletRequest request = new MockHttpServletRequest();
request.addHeader("X-Internal-Token", "wrong-token");
MockHttpServletResponse response = new MockHttpServletResponse();
boolean result = interceptor.preHandle(request, response, new Object());
assertThat(result).isFalse();
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
assertThat(response.getContentType()).startsWith("application/json");
assertThat(response.getContentAsString()).contains("knowledge_graph.0013");
}
@Test
void missingTokenHeader_rejects() throws Exception {
properties.getSecurity().setInternalToken(VALID_TOKEN);
MockHttpServletRequest request = new MockHttpServletRequest();
// No X-Internal-Token header
MockHttpServletResponse response = new MockHttpServletResponse();
boolean result = interceptor.preHandle(request, response, new Object());
assertThat(result).isFalse();
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
}
// -----------------------------------------------------------------------
// 错误响应格式:应使用 Response 体系
// -----------------------------------------------------------------------
@Test
void errorResponse_usesResponseFormat() throws Exception {
properties.getSecurity().setInternalToken(VALID_TOKEN);
MockHttpServletRequest request = new MockHttpServletRequest();
request.addHeader("X-Internal-Token", "wrong");
MockHttpServletResponse response = new MockHttpServletResponse();
interceptor.preHandle(request, response, new Object());
String body = response.getContentAsString();
assertThat(body).contains("\"code\"");
assertThat(body).contains("\"message\"");
// Response.error() 包含 data 字段(值为 null)
assertThat(body).contains("\"data\"");
}
}

View File

@@ -7,8 +7,14 @@ import org.springframework.security.config.annotation.web.configuration.EnableWe
import org.springframework.security.web.SecurityFilterChain; import org.springframework.security.web.SecurityFilterChain;
/** /**
* 安全配置 - 暂时禁用所有认证 * Spring Security 配置。
* 开发阶段使用,生产环境需要启用认证 * <p>
* 安全架构采用双层防护:
* <ul>
* <li><b>Gateway 层</b>:API Gateway 负责 JWT 校验,通过后透传 X-User-* headers 到后端服务</li>
* <li><b>服务层</b>:内部 sync 端点通过 {@code InternalTokenInterceptor} 校验 X-Internal-Token</li>
* </ul>
* 当前 SecurityFilterChain 配置为 permitAll,HTTP 级别的访问控制由 Gateway 和业务拦截器共同完成。
*/ */
@Configuration @Configuration
@EnableWebSecurity @EnableWebSecurity

View File

@@ -3,12 +3,6 @@ spring:
application: application:
name: datamate name: datamate
# 暂时排除Spring Security自动配置(开发阶段使用)
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
- org.springframework.boot.autoconfigure.security.servlet.UserDetailsServiceAutoConfiguration
# 数据源配置 # 数据源配置
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver