You've already forked DataMate
Compare commits
3 Commits
74daed1c25
...
444f8cd015
| Author | SHA1 | Date | |
|---|---|---|---|
| 444f8cd015 | |||
| f12e4abd83 | |||
| 42069f82b3 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -189,4 +189,4 @@ Thumbs.db
|
||||
*.sublime-workspace
|
||||
|
||||
# Milvus
|
||||
deployment/docker/milvus/volumes/
|
||||
deployment/docker/milvus/volumes/
|
||||
|
||||
@@ -110,9 +110,9 @@ Thank you for your interest in this project! We warmly welcome contributions fro
|
||||
bug reports, suggesting new features, or directly participating in code development, all forms of help make the project
|
||||
better.
|
||||
|
||||
• 📮 [GitHub Issues](../../issues): Submit bugs or feature suggestions.
|
||||
• 📮 [GitHub Issues](https://github.com/ModelEngine-Group/DataMate/issues): Submit bugs or feature suggestions.
|
||||
|
||||
• 🔧 [GitHub Pull Requests](../../pulls): Contribute code improvements.
|
||||
• 🔧 [GitHub Pull Requests](https://github.com/ModelEngine-Group/DataMate/pulls): Contribute code improvements.
|
||||
|
||||
## 📄 License
|
||||
|
||||
|
||||
@@ -2,7 +2,9 @@ package com.datamate.knowledgegraph.application;
|
||||
|
||||
import com.datamate.common.infrastructure.exception.BusinessException;
|
||||
import com.datamate.common.infrastructure.exception.SystemErrorCode;
|
||||
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
|
||||
import com.datamate.knowledgegraph.domain.model.SyncResult;
|
||||
import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository;
|
||||
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
|
||||
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
|
||||
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
|
||||
@@ -15,6 +17,7 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
@@ -52,6 +55,7 @@ public class GraphSyncService {
|
||||
private final GraphSyncStepService stepService;
|
||||
private final DataManagementClient dataManagementClient;
|
||||
private final KnowledgeGraphProperties properties;
|
||||
private final SyncHistoryRepository syncHistoryRepository;
|
||||
|
||||
/** 同 graphId 互斥锁,防止并发同步。 */
|
||||
private final ConcurrentHashMap<String, ReentrantLock> graphLocks = new ConcurrentHashMap<>();
|
||||
@@ -60,9 +64,10 @@ public class GraphSyncService {
|
||||
// 全量同步
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
public List<SyncResult> syncAll(String graphId) {
|
||||
public SyncMetadata syncAll(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
@@ -178,10 +183,16 @@ public class GraphSyncService {
|
||||
results.stream()
|
||||
.map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")")
|
||||
.collect(Collectors.joining(", ")));
|
||||
return results;
|
||||
|
||||
SyncMetadata metadata = SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, results);
|
||||
saveSyncHistory(metadata);
|
||||
return metadata;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage()));
|
||||
log.error("[{}] Full sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "全量同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -189,13 +200,116 @@ public class GraphSyncService {
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 增量同步
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* 增量同步:仅拉取指定时间窗口内变更的数据并同步到 Neo4j。
|
||||
* <p>
|
||||
* 与全量同步的区别:
|
||||
* <ul>
|
||||
* <li>通过 updatedFrom/updatedTo 过滤变更数据</li>
|
||||
* <li>不执行 purge(不删除旧实体)</li>
|
||||
* <li>在 SyncMetadata 中记录时间窗口</li>
|
||||
* </ul>
|
||||
*/
|
||||
public SyncMetadata syncIncremental(String graphId, LocalDateTime updatedFrom, LocalDateTime updatedTo) {
|
||||
validateGraphId(graphId);
|
||||
if (updatedFrom == null || updatedTo == null) {
|
||||
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "增量同步必须指定 updatedFrom 和 updatedTo");
|
||||
}
|
||||
if (updatedFrom.isAfter(updatedTo)) {
|
||||
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "updatedFrom 不能晚于 updatedTo");
|
||||
}
|
||||
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
log.info("[{}] Starting incremental sync for graphId={}, window=[{}, {}]",
|
||||
syncId, graphId, updatedFrom, updatedTo);
|
||||
|
||||
// 拉取时间窗口内变更的数据
|
||||
List<DatasetDTO> datasets = fetchWithRetry(syncId, "datasets",
|
||||
() -> dataManagementClient.listAllDatasets(updatedFrom, updatedTo));
|
||||
List<WorkflowDTO> workflows = fetchWithRetry(syncId, "workflows",
|
||||
() -> dataManagementClient.listAllWorkflows(updatedFrom, updatedTo));
|
||||
List<JobDTO> jobs = fetchWithRetry(syncId, "jobs",
|
||||
() -> dataManagementClient.listAllJobs(updatedFrom, updatedTo));
|
||||
List<LabelTaskDTO> labelTasks = fetchWithRetry(syncId, "label-tasks",
|
||||
() -> dataManagementClient.listAllLabelTasks(updatedFrom, updatedTo));
|
||||
List<KnowledgeSetDTO> knowledgeSets = fetchWithRetry(syncId, "knowledge-sets",
|
||||
() -> dataManagementClient.listAllKnowledgeSets(updatedFrom, updatedTo));
|
||||
|
||||
Map<String, SyncResult> resultMap = new LinkedHashMap<>();
|
||||
|
||||
// 实体同步(仅 upsert,不 purge)
|
||||
resultMap.put("Dataset", stepService.upsertDatasetEntities(graphId, datasets, syncId));
|
||||
resultMap.put("Field", stepService.upsertFieldEntities(graphId, datasets, syncId));
|
||||
|
||||
Set<String> usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets);
|
||||
resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId));
|
||||
resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId));
|
||||
resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId));
|
||||
resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId));
|
||||
resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId));
|
||||
resultMap.put("KnowledgeSet", stepService.upsertKnowledgeSetEntities(graphId, knowledgeSets, syncId));
|
||||
|
||||
// 收集所有变更(创建或更新)的实体ID,用于增量关系构建
|
||||
Set<String> changedEntityIds = collectChangedEntityIds(datasets, workflows, jobs, labelTasks, knowledgeSets, graphId);
|
||||
|
||||
// 关系构建(MERGE 幂等)- 增量同步时只处理变更实体相关的关系
|
||||
resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("TRIGGERS", stepService.mergeTriggersRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("DEPENDS_ON", stepService.mergeDependsOnRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("IMPACTS", stepService.mergeImpactsRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("SOURCED_FROM", stepService.mergeSourcedFromRelations(graphId, syncId, changedEntityIds));
|
||||
|
||||
List<SyncResult> results = new ArrayList<>(resultMap.values());
|
||||
log.info("[{}] Incremental sync completed for graphId={}. Summary: {}", syncId, graphId,
|
||||
results.stream()
|
||||
.map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")")
|
||||
.collect(Collectors.joining(", ")));
|
||||
|
||||
SyncMetadata metadata = SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, results);
|
||||
metadata.setUpdatedFrom(updatedFrom);
|
||||
metadata.setUpdatedTo(updatedTo);
|
||||
saveSyncHistory(metadata);
|
||||
return metadata;
|
||||
} catch (BusinessException e) {
|
||||
SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage());
|
||||
failed.setUpdatedFrom(updatedFrom);
|
||||
failed.setUpdatedTo(updatedTo);
|
||||
saveSyncHistory(failed);
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
SyncMetadata failed = SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_INCREMENTAL, startedAt, e.getMessage());
|
||||
failed.setUpdatedFrom(updatedFrom);
|
||||
failed.setUpdatedTo(updatedTo);
|
||||
saveSyncHistory(failed);
|
||||
log.error("[{}] Incremental sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "增量同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
releaseLock(graphId, lock);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 单步同步(各自获取锁和数据)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
public SyncResult syncDatasets(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
|
||||
@@ -206,10 +320,14 @@ public class GraphSyncService {
|
||||
.collect(Collectors.toSet());
|
||||
int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId);
|
||||
result.setPurged(purged);
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage()));
|
||||
log.error("[{}] Dataset sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "数据集同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -219,7 +337,8 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult syncFields(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
|
||||
@@ -237,10 +356,14 @@ public class GraphSyncService {
|
||||
}
|
||||
}
|
||||
result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId));
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage()));
|
||||
log.error("[{}] Field sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "字段同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -250,7 +373,8 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult syncUsers(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
|
||||
@@ -266,10 +390,14 @@ public class GraphSyncService {
|
||||
SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId);
|
||||
Set<String> activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet());
|
||||
result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId));
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage()));
|
||||
log.error("[{}] User sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "用户同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -279,13 +407,19 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult syncOrgs(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.upsertOrgEntities(graphId, syncId);
|
||||
SyncResult result = stepService.upsertOrgEntities(graphId, syncId);
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage()));
|
||||
log.error("[{}] Org sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "组织同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -295,7 +429,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildHasFieldRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeHasFieldRelations(graphId, syncId);
|
||||
@@ -312,7 +446,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildDerivedFromRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeDerivedFromRelations(graphId, syncId);
|
||||
@@ -329,7 +463,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildBelongsToRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeBelongsToRelations(graphId, syncId);
|
||||
@@ -350,7 +484,8 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult syncWorkflows(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
List<WorkflowDTO> workflows = fetchWithRetry(syncId, "workflows",
|
||||
@@ -361,10 +496,14 @@ public class GraphSyncService {
|
||||
.filter(Objects::nonNull).filter(id -> !id.isBlank())
|
||||
.collect(Collectors.toSet());
|
||||
result.setPurged(stepService.purgeStaleEntities(graphId, "Workflow", activeIds, syncId));
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage()));
|
||||
log.error("[{}] Workflow sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "工作流同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -374,7 +513,8 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult syncJobs(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
List<JobDTO> jobs = fetchWithRetry(syncId, "jobs",
|
||||
@@ -385,10 +525,14 @@ public class GraphSyncService {
|
||||
.filter(Objects::nonNull).filter(id -> !id.isBlank())
|
||||
.collect(Collectors.toSet());
|
||||
result.setPurged(stepService.purgeStaleEntities(graphId, "Job", activeIds, syncId));
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage()));
|
||||
log.error("[{}] Job sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "作业同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -398,7 +542,8 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult syncLabelTasks(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
List<LabelTaskDTO> tasks = fetchWithRetry(syncId, "label-tasks",
|
||||
@@ -409,10 +554,14 @@ public class GraphSyncService {
|
||||
.filter(Objects::nonNull).filter(id -> !id.isBlank())
|
||||
.collect(Collectors.toSet());
|
||||
result.setPurged(stepService.purgeStaleEntities(graphId, "LabelTask", activeIds, syncId));
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage()));
|
||||
log.error("[{}] LabelTask sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "标注任务同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -422,7 +571,8 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult syncKnowledgeSets(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
List<KnowledgeSetDTO> knowledgeSets = fetchWithRetry(syncId, "knowledge-sets",
|
||||
@@ -433,10 +583,14 @@ public class GraphSyncService {
|
||||
.filter(Objects::nonNull).filter(id -> !id.isBlank())
|
||||
.collect(Collectors.toSet());
|
||||
result.setPurged(stepService.purgeStaleEntities(graphId, "KnowledgeSet", activeIds, syncId));
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, List.of(result)));
|
||||
return result;
|
||||
} catch (BusinessException e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage()));
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage()));
|
||||
log.error("[{}] KnowledgeSet sync failed for graphId={}", syncId, graphId, e);
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "知识集同步失败,syncId=" + syncId);
|
||||
} finally {
|
||||
@@ -450,7 +604,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildUsesDatasetRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeUsesDatasetRelations(graphId, syncId);
|
||||
@@ -467,7 +621,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildProducesRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeProducesRelations(graphId, syncId);
|
||||
@@ -484,7 +638,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildAssignedToRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeAssignedToRelations(graphId, syncId);
|
||||
@@ -501,7 +655,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildTriggersRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeTriggersRelations(graphId, syncId);
|
||||
@@ -518,7 +672,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildDependsOnRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeDependsOnRelations(graphId, syncId);
|
||||
@@ -535,7 +689,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildImpactsRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeImpactsRelations(graphId, syncId);
|
||||
@@ -552,7 +706,7 @@ public class GraphSyncService {
|
||||
|
||||
public SyncResult buildSourcedFromRelations(String graphId) {
|
||||
validateGraphId(graphId);
|
||||
String syncId = UUID.randomUUID().toString().substring(0, 8);
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeSourcedFromRelations(graphId, syncId);
|
||||
@@ -567,6 +721,43 @@ public class GraphSyncService {
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 同步历史查询
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* 查询同步历史记录。
|
||||
*/
|
||||
public List<SyncMetadata> getSyncHistory(String graphId, String status, int limit) {
|
||||
validateGraphId(graphId);
|
||||
if (status != null && !status.isBlank()) {
|
||||
return syncHistoryRepository.findByGraphIdAndStatus(graphId, status, limit);
|
||||
}
|
||||
return syncHistoryRepository.findByGraphId(graphId, limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* 按时间范围查询同步历史(分页)。
|
||||
*/
|
||||
public List<SyncMetadata> getSyncHistoryByTimeRange(String graphId,
|
||||
LocalDateTime from, LocalDateTime to,
|
||||
int page, int size) {
|
||||
validateGraphId(graphId);
|
||||
long skip = (long) page * size;
|
||||
if (skip > 2_000_000L) {
|
||||
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "分页偏移量超出允许范围");
|
||||
}
|
||||
return syncHistoryRepository.findByGraphIdAndTimeRange(graphId, from, to, skip, size);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 syncId 查询单条同步记录。
|
||||
*/
|
||||
public Optional<SyncMetadata> getSyncRecord(String graphId, String syncId) {
|
||||
validateGraphId(graphId);
|
||||
return syncHistoryRepository.findByGraphIdAndSyncId(graphId, syncId);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 内部方法
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -671,6 +862,85 @@ public class GraphSyncService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 持久化同步元数据,失败时仅记录日志,不影响主流程。
|
||||
*/
|
||||
private void saveSyncHistory(SyncMetadata metadata) {
|
||||
try {
|
||||
syncHistoryRepository.save(metadata);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to save sync history: {}", metadata.getSyncId(), e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 收集增量同步中变更(创建或更新)的实体ID。
|
||||
* 通过查询数据库获取这些sourceId对应的entityId。
|
||||
*/
|
||||
private Set<String> collectChangedEntityIds(List<DatasetDTO> datasets,
|
||||
List<WorkflowDTO> workflows,
|
||||
List<JobDTO> jobs,
|
||||
List<LabelTaskDTO> labelTasks,
|
||||
List<KnowledgeSetDTO> knowledgeSets,
|
||||
String graphId) {
|
||||
Set<String> entityIds = new HashSet<>();
|
||||
|
||||
// 通过数据管理客户端获取到的sourceId,需要转换为对应的entityId
|
||||
// 这里使用简化的方法:查询所有相关类型的实体并根据sourceId匹配
|
||||
try {
|
||||
// 收集所有变更的sourceId
|
||||
Set<String> changedSourceIds = new HashSet<>();
|
||||
|
||||
datasets.stream().filter(Objects::nonNull).map(DatasetDTO::getId).filter(Objects::nonNull)
|
||||
.forEach(changedSourceIds::add);
|
||||
|
||||
workflows.stream().filter(Objects::nonNull).map(WorkflowDTO::getId).filter(Objects::nonNull)
|
||||
.forEach(changedSourceIds::add);
|
||||
|
||||
jobs.stream().filter(Objects::nonNull).map(JobDTO::getId).filter(Objects::nonNull)
|
||||
.forEach(changedSourceIds::add);
|
||||
|
||||
labelTasks.stream().filter(Objects::nonNull).map(LabelTaskDTO::getId).filter(Objects::nonNull)
|
||||
.forEach(changedSourceIds::add);
|
||||
|
||||
knowledgeSets.stream().filter(Objects::nonNull).map(KnowledgeSetDTO::getId).filter(Objects::nonNull)
|
||||
.forEach(changedSourceIds::add);
|
||||
|
||||
// 添加字段的sourceId
|
||||
for (DatasetDTO dataset : datasets) {
|
||||
if (dataset != null && dataset.getTags() != null) {
|
||||
for (DataManagementClient.TagDTO tag : dataset.getTags()) {
|
||||
if (tag != null && tag.getName() != null) {
|
||||
changedSourceIds.add(dataset.getId() + ":tag:" + tag.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 查询这些sourceId对应的entityId
|
||||
if (!changedSourceIds.isEmpty()) {
|
||||
for (String sourceId : changedSourceIds) {
|
||||
// 简化处理:这里可以优化为批量查询
|
||||
String cypher = "MATCH (e:Entity {graph_id: $graphId, source_id: $sourceId}) RETURN e.id AS entityId";
|
||||
List<String> foundEntityIds = stepService.neo4jClient.query(cypher)
|
||||
.bindAll(Map.of("graphId", graphId, "sourceId", sourceId))
|
||||
.fetchAs(String.class)
|
||||
.mappedBy((ts, record) -> record.get("entityId").asString())
|
||||
.all()
|
||||
.stream().toList();
|
||||
entityIds.addAll(foundEntityIds);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to collect changed entity IDs, falling back to full relation rebuild: {}", e.getMessage());
|
||||
// 如果收集失败,返回null表示进行全量关系构建
|
||||
return null;
|
||||
}
|
||||
|
||||
log.debug("Collected {} changed entity IDs for incremental relation building", entityIds.size());
|
||||
return entityIds;
|
||||
}
|
||||
|
||||
private void validateGraphId(String graphId) {
|
||||
if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) {
|
||||
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效");
|
||||
|
||||
@@ -39,7 +39,7 @@ public class GraphSyncStepService {
|
||||
private static final String REL_TYPE = "RELATED_TO";
|
||||
|
||||
private final GraphEntityRepository entityRepository;
|
||||
private final Neo4jClient neo4jClient;
|
||||
final Neo4jClient neo4jClient; // 改为包级别访问,供GraphSyncService使用
|
||||
private final KnowledgeGraphProperties properties;
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -441,11 +441,35 @@ public class GraphSyncStepService {
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeHasFieldRelations(String graphId, String syncId) {
|
||||
return mergeHasFieldRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeHasFieldRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("HAS_FIELD", syncId);
|
||||
|
||||
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
|
||||
List<GraphEntity> fields = entityRepository.findByGraphIdAndType(graphId, "Field");
|
||||
|
||||
// 增量同步时只处理变更相关的字段
|
||||
if (changedEntityIds != null) {
|
||||
fields = fields.stream()
|
||||
.filter(field -> {
|
||||
// 包含自身变更的字段
|
||||
if (changedEntityIds.contains(field.getId())) {
|
||||
return true;
|
||||
}
|
||||
// 包含关联数据集发生变更的字段
|
||||
Object datasetSourceId = field.getProperties().get("dataset_source_id");
|
||||
if (datasetSourceId != null) {
|
||||
String datasetEntityId = datasetMap.get(datasetSourceId.toString());
|
||||
return datasetEntityId != null && changedEntityIds.contains(datasetEntityId);
|
||||
}
|
||||
return false;
|
||||
})
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity field : fields) {
|
||||
try {
|
||||
Object datasetSourceId = field.getProperties().get("dataset_source_id");
|
||||
@@ -477,11 +501,23 @@ public class GraphSyncStepService {
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeDerivedFromRelations(String graphId, String syncId) {
|
||||
return mergeDerivedFromRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeDerivedFromRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("DERIVED_FROM", syncId);
|
||||
|
||||
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
|
||||
List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
|
||||
|
||||
// 增量同步时只处理变更的数据集
|
||||
if (changedEntityIds != null) {
|
||||
datasets = datasets.stream()
|
||||
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity dataset : datasets) {
|
||||
try {
|
||||
Object parentId = dataset.getProperties().get("parent_dataset_id");
|
||||
@@ -512,6 +548,11 @@ public class GraphSyncStepService {
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeBelongsToRelations(String graphId, String syncId) {
|
||||
return mergeBelongsToRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeBelongsToRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("BELONGS_TO", syncId);
|
||||
|
||||
Optional<GraphEntity> defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType(
|
||||
@@ -524,7 +565,13 @@ public class GraphSyncStepService {
|
||||
String orgId = defaultOrgOpt.get().getId();
|
||||
|
||||
// User → Org
|
||||
for (GraphEntity user : entityRepository.findByGraphIdAndType(graphId, "User")) {
|
||||
List<GraphEntity> users = entityRepository.findByGraphIdAndType(graphId, "User");
|
||||
if (changedEntityIds != null) {
|
||||
users = users.stream()
|
||||
.filter(user -> changedEntityIds.contains(user.getId()))
|
||||
.toList();
|
||||
}
|
||||
for (GraphEntity user : users) {
|
||||
try {
|
||||
boolean created = mergeRelation(graphId, user.getId(), orgId,
|
||||
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
|
||||
@@ -536,7 +583,13 @@ public class GraphSyncStepService {
|
||||
}
|
||||
|
||||
// Dataset → Org
|
||||
for (GraphEntity dataset : entityRepository.findByGraphIdAndType(graphId, "Dataset")) {
|
||||
List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
|
||||
if (changedEntityIds != null) {
|
||||
datasets = datasets.stream()
|
||||
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
|
||||
.toList();
|
||||
}
|
||||
for (GraphEntity dataset : datasets) {
|
||||
try {
|
||||
boolean created = mergeRelation(graphId, dataset.getId(), orgId,
|
||||
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
|
||||
@@ -559,22 +612,45 @@ public class GraphSyncStepService {
|
||||
*/
|
||||
@Transactional
|
||||
public SyncResult mergeUsesDatasetRelations(String graphId, String syncId) {
|
||||
return mergeUsesDatasetRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeUsesDatasetRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("USES_DATASET", syncId);
|
||||
|
||||
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
|
||||
|
||||
// Job → Dataset (via input_dataset_id)
|
||||
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) {
|
||||
List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
|
||||
if (changedEntityIds != null) {
|
||||
jobs = jobs.stream()
|
||||
.filter(job -> changedEntityIds.contains(job.getId()))
|
||||
.toList();
|
||||
}
|
||||
for (GraphEntity job : jobs) {
|
||||
mergeEntityToDatasets(graphId, job, "input_dataset_id", datasetMap, result, syncId);
|
||||
}
|
||||
|
||||
// LabelTask → Dataset (via dataset_id)
|
||||
for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) {
|
||||
List<GraphEntity> tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask");
|
||||
if (changedEntityIds != null) {
|
||||
tasks = tasks.stream()
|
||||
.filter(task -> changedEntityIds.contains(task.getId()))
|
||||
.toList();
|
||||
}
|
||||
for (GraphEntity task : tasks) {
|
||||
mergeEntityToDatasets(graphId, task, "dataset_id", datasetMap, result, syncId);
|
||||
}
|
||||
|
||||
// Workflow → Dataset (via input_dataset_ids, multi-value)
|
||||
for (GraphEntity workflow : entityRepository.findByGraphIdAndType(graphId, "Workflow")) {
|
||||
List<GraphEntity> workflows = entityRepository.findByGraphIdAndType(graphId, "Workflow");
|
||||
if (changedEntityIds != null) {
|
||||
workflows = workflows.stream()
|
||||
.filter(workflow -> changedEntityIds.contains(workflow.getId()))
|
||||
.toList();
|
||||
}
|
||||
for (GraphEntity workflow : workflows) {
|
||||
mergeEntityToDatasets(graphId, workflow, "input_dataset_ids", datasetMap, result, syncId);
|
||||
}
|
||||
|
||||
@@ -616,11 +692,23 @@ public class GraphSyncStepService {
|
||||
*/
|
||||
@Transactional
|
||||
public SyncResult mergeProducesRelations(String graphId, String syncId) {
|
||||
return mergeProducesRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeProducesRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("PRODUCES", syncId);
|
||||
|
||||
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
|
||||
|
||||
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) {
|
||||
List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
|
||||
if (changedEntityIds != null) {
|
||||
jobs = jobs.stream()
|
||||
.filter(job -> changedEntityIds.contains(job.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity job : jobs) {
|
||||
try {
|
||||
Object outputDatasetId = job.getProperties().get("output_dataset_id");
|
||||
if (outputDatasetId == null || outputDatasetId.toString().isBlank()) {
|
||||
@@ -647,17 +735,34 @@ public class GraphSyncStepService {
|
||||
*/
|
||||
@Transactional
|
||||
public SyncResult mergeAssignedToRelations(String graphId, String syncId) {
|
||||
return mergeAssignedToRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeAssignedToRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("ASSIGNED_TO", syncId);
|
||||
|
||||
Map<String, String> userMap = buildSourceIdToEntityIdMap(graphId, "User");
|
||||
|
||||
// LabelTask → User
|
||||
for (GraphEntity task : entityRepository.findByGraphIdAndType(graphId, "LabelTask")) {
|
||||
List<GraphEntity> tasks = entityRepository.findByGraphIdAndType(graphId, "LabelTask");
|
||||
if (changedEntityIds != null) {
|
||||
tasks = tasks.stream()
|
||||
.filter(task -> changedEntityIds.contains(task.getId()))
|
||||
.toList();
|
||||
}
|
||||
for (GraphEntity task : tasks) {
|
||||
mergeCreatorAssignment(graphId, task, "label_task", userMap, result, syncId);
|
||||
}
|
||||
|
||||
// Job → User
|
||||
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) {
|
||||
List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
|
||||
if (changedEntityIds != null) {
|
||||
jobs = jobs.stream()
|
||||
.filter(job -> changedEntityIds.contains(job.getId()))
|
||||
.toList();
|
||||
}
|
||||
for (GraphEntity job : jobs) {
|
||||
mergeCreatorAssignment(graphId, job, "job", userMap, result, syncId);
|
||||
}
|
||||
|
||||
@@ -692,11 +797,23 @@ public class GraphSyncStepService {
|
||||
*/
|
||||
@Transactional
|
||||
public SyncResult mergeTriggersRelations(String graphId, String syncId) {
|
||||
return mergeTriggersRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeTriggersRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("TRIGGERS", syncId);
|
||||
|
||||
Map<String, String> workflowMap = buildSourceIdToEntityIdMap(graphId, "Workflow");
|
||||
|
||||
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) {
|
||||
List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
|
||||
if (changedEntityIds != null) {
|
||||
jobs = jobs.stream()
|
||||
.filter(job -> changedEntityIds.contains(job.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity job : jobs) {
|
||||
try {
|
||||
Object workflowId = job.getProperties().get("workflow_id");
|
||||
if (workflowId == null || workflowId.toString().isBlank()) {
|
||||
@@ -724,11 +841,23 @@ public class GraphSyncStepService {
|
||||
*/
|
||||
@Transactional
|
||||
public SyncResult mergeDependsOnRelations(String graphId, String syncId) {
|
||||
return mergeDependsOnRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeDependsOnRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("DEPENDS_ON", syncId);
|
||||
|
||||
Map<String, String> jobMap = buildSourceIdToEntityIdMap(graphId, "Job");
|
||||
|
||||
for (GraphEntity job : entityRepository.findByGraphIdAndType(graphId, "Job")) {
|
||||
List<GraphEntity> jobs = entityRepository.findByGraphIdAndType(graphId, "Job");
|
||||
if (changedEntityIds != null) {
|
||||
jobs = jobs.stream()
|
||||
.filter(job -> changedEntityIds.contains(job.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity job : jobs) {
|
||||
try {
|
||||
Object depJobId = job.getProperties().get("depends_on_job_id");
|
||||
if (depJobId == null || depJobId.toString().isBlank()) {
|
||||
@@ -751,29 +880,159 @@ public class GraphSyncStepService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建 IMPACTS 关系:Field → Field。
|
||||
* 构建 IMPACTS 关系:Field → Field(字段级血缘)。
|
||||
* <p>
|
||||
* TODO: 字段影响关系来源于 LLM 抽取或规则引擎,而非简单外键关联。
|
||||
* 当前 MVP 阶段为占位实现,后续由抽取模块填充。
|
||||
* 通过两种途径推导字段间的影响关系:
|
||||
* <ol>
|
||||
* <li>DERIVED_FROM:若 Dataset B 派生自 Dataset A(parent_dataset_id),
|
||||
* 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT)。</li>
|
||||
* <li>Job 输入/输出:若 Job 使用 Dataset A 并产出 Dataset B,
|
||||
* 则 A 中与 B 同名的字段产生 IMPACTS 关系(impact_type=DIRECT, job_id=源 ID)。</li>
|
||||
* </ol>
|
||||
*/
|
||||
@Transactional
|
||||
public SyncResult mergeImpactsRelations(String graphId, String syncId) {
|
||||
return mergeImpactsRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeImpactsRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("IMPACTS", syncId);
|
||||
result.setPlaceholder(true);
|
||||
log.debug("[{}] IMPACTS relations require extraction data, skipping in sync phase", syncId);
|
||||
|
||||
// 1. 加载所有 Field,按 dataset_source_id 分组
|
||||
List<GraphEntity> allFields = entityRepository.findByGraphIdAndType(graphId, "Field");
|
||||
Map<String, List<GraphEntity>> fieldsByDataset = allFields.stream()
|
||||
.filter(f -> f.getProperties().get("dataset_source_id") != null)
|
||||
.collect(Collectors.groupingBy(
|
||||
f -> f.getProperties().get("dataset_source_id").toString()));
|
||||
|
||||
if (fieldsByDataset.isEmpty()) {
|
||||
log.debug("[{}] No fields with dataset_source_id found, skipping IMPACTS", syncId);
|
||||
return endResult(result);
|
||||
}
|
||||
|
||||
// 记录已处理的 (sourceDatasetId, targetDatasetId) 对,避免重复
|
||||
Set<String> processedPairs = new HashSet<>();
|
||||
|
||||
// 2. DERIVED_FROM 推导:parent dataset fields → child dataset fields
|
||||
List<GraphEntity> allDatasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
|
||||
|
||||
// 增量同步时只处理变更的数据集
|
||||
if (changedEntityIds != null) {
|
||||
allDatasets = allDatasets.stream()
|
||||
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity dataset : allDatasets) {
|
||||
Object parentId = dataset.getProperties().get("parent_dataset_id");
|
||||
if (parentId == null || parentId.toString().isBlank()) {
|
||||
continue;
|
||||
}
|
||||
String pairKey = parentId + "→" + dataset.getSourceId();
|
||||
processedPairs.add(pairKey);
|
||||
mergeFieldImpacts(graphId, parentId.toString(), dataset.getSourceId(),
|
||||
fieldsByDataset, null, result, syncId);
|
||||
}
|
||||
|
||||
// 3. Job 输入/输出推导:input dataset fields → output dataset fields
|
||||
List<GraphEntity> allJobs = entityRepository.findByGraphIdAndType(graphId, "Job");
|
||||
|
||||
// 增量同步时只处理变更的作业
|
||||
if (changedEntityIds != null) {
|
||||
allJobs = allJobs.stream()
|
||||
.filter(job -> changedEntityIds.contains(job.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity job : allJobs) {
|
||||
Object inputDsId = job.getProperties().get("input_dataset_id");
|
||||
Object outputDsId = job.getProperties().get("output_dataset_id");
|
||||
if (inputDsId == null || outputDsId == null
|
||||
|| inputDsId.toString().isBlank() || outputDsId.toString().isBlank()) {
|
||||
continue;
|
||||
}
|
||||
String pairKey = inputDsId + "→" + outputDsId;
|
||||
if (processedPairs.contains(pairKey)) {
|
||||
continue;
|
||||
}
|
||||
processedPairs.add(pairKey);
|
||||
mergeFieldImpacts(graphId, inputDsId.toString(), outputDsId.toString(),
|
||||
fieldsByDataset, job.getSourceId(), result, syncId);
|
||||
}
|
||||
|
||||
return endResult(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* 对两个关联 Dataset 的字段按名称匹配,创建 IMPACTS 关系。
|
||||
*/
|
||||
private void mergeFieldImpacts(String graphId,
|
||||
String sourceDatasetSourceId, String targetDatasetSourceId,
|
||||
Map<String, List<GraphEntity>> fieldsByDataset,
|
||||
String jobSourceId,
|
||||
SyncResult result, String syncId) {
|
||||
List<GraphEntity> sourceFields = fieldsByDataset.getOrDefault(sourceDatasetSourceId, List.of());
|
||||
List<GraphEntity> targetFields = fieldsByDataset.getOrDefault(targetDatasetSourceId, List.of());
|
||||
|
||||
if (sourceFields.isEmpty() || targetFields.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 目标字段按名称索引
|
||||
Map<String, GraphEntity> targetByName = targetFields.stream()
|
||||
.filter(f -> f.getName() != null && !f.getName().isBlank())
|
||||
.collect(Collectors.toMap(GraphEntity::getName, f -> f, (a, b) -> a));
|
||||
|
||||
for (GraphEntity srcField : sourceFields) {
|
||||
if (srcField.getName() == null || srcField.getName().isBlank()) {
|
||||
continue;
|
||||
}
|
||||
GraphEntity tgtField = targetByName.get(srcField.getName());
|
||||
if (tgtField == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
String propsJson = jobSourceId != null
|
||||
? "{\"impact_type\":\"DIRECT\",\"job_id\":\"" + sanitizePropertyValue(jobSourceId) + "\"}"
|
||||
: "{\"impact_type\":\"DIRECT\"}";
|
||||
boolean created = mergeRelation(graphId, srcField.getId(), tgtField.getId(),
|
||||
"IMPACTS", propsJson, syncId);
|
||||
if (created) {
|
||||
result.incrementCreated();
|
||||
} else {
|
||||
result.incrementSkipped();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to merge IMPACTS: {} → {}", syncId,
|
||||
srcField.getId(), tgtField.getId(), e);
|
||||
result.addError("impacts:" + srcField.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建 SOURCED_FROM 关系:KnowledgeSet → Dataset(通过 source_dataset_ids)。
|
||||
*/
|
||||
@Transactional
|
||||
public SyncResult mergeSourcedFromRelations(String graphId, String syncId) {
|
||||
return mergeSourcedFromRelations(graphId, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeSourcedFromRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("SOURCED_FROM", syncId);
|
||||
|
||||
Map<String, String> datasetMap = buildSourceIdToEntityIdMap(graphId, "Dataset");
|
||||
|
||||
for (GraphEntity ks : entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet")) {
|
||||
List<GraphEntity> knowledgeSets = entityRepository.findByGraphIdAndType(graphId, "KnowledgeSet");
|
||||
if (changedEntityIds != null) {
|
||||
knowledgeSets = knowledgeSets.stream()
|
||||
.filter(ks -> changedEntityIds.contains(ks.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
for (GraphEntity ks : knowledgeSets) {
|
||||
try {
|
||||
Object sourceIds = ks.getProperties().get("source_dataset_ids");
|
||||
if (sourceIds == null) {
|
||||
@@ -847,7 +1106,8 @@ public class GraphSyncStepService {
|
||||
"ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " +
|
||||
" e.name = $name, e.description = $description, " +
|
||||
" e.created_at = datetime(), e.updated_at = datetime()" + extraSet + " " +
|
||||
"ON MATCH SET e.name = $name, e.description = $description, " +
|
||||
"ON MATCH SET e.name = CASE WHEN $name <> '' THEN $name ELSE e.name END, " +
|
||||
" e.description = CASE WHEN $description <> '' THEN $description ELSE e.description END, " +
|
||||
" e.updated_at = datetime()" + extraSet + " " +
|
||||
"RETURN e.id = $newId AS isNew"
|
||||
)
|
||||
@@ -871,6 +1131,16 @@ public class GraphSyncStepService {
|
||||
return key.replaceAll("[^a-zA-Z0-9_]", "");
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理属性值用于 JSON 字符串拼接,转义双引号和反斜杠,防止 JSON 注入。
|
||||
*/
|
||||
private static String sanitizePropertyValue(String value) {
|
||||
if (value == null) {
|
||||
return "";
|
||||
}
|
||||
return value.replace("\\", "\\\\").replace("\"", "\\\"");
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 Java 值转换为 Neo4j 兼容的属性值。
|
||||
* <p>
|
||||
@@ -925,6 +1195,7 @@ public class GraphSyncStepService {
|
||||
"MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " +
|
||||
"ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " +
|
||||
" r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " +
|
||||
"ON MATCH SET r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " +
|
||||
"RETURN r.id AS relId"
|
||||
)
|
||||
.bindAll(Map.of(
|
||||
|
||||
@@ -0,0 +1,194 @@
|
||||
package com.datamate.knowledgegraph.domain.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.data.annotation.Transient;
|
||||
import org.springframework.data.neo4j.core.schema.GeneratedValue;
|
||||
import org.springframework.data.neo4j.core.schema.Id;
|
||||
import org.springframework.data.neo4j.core.schema.Node;
|
||||
import org.springframework.data.neo4j.core.schema.Property;
|
||||
import org.springframework.data.neo4j.core.support.UUIDStringGenerator;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 同步操作元数据,用于记录每次同步的整体状态和统计信息。
|
||||
* <p>
|
||||
* 同时作为 Neo4j 节点持久化到图数据库,支持历史查询和问题排查。
|
||||
*/
|
||||
@Node("SyncHistory")
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class SyncMetadata {
|
||||
|
||||
public static final String STATUS_SUCCESS = "SUCCESS";
|
||||
public static final String STATUS_FAILED = "FAILED";
|
||||
public static final String STATUS_PARTIAL = "PARTIAL";
|
||||
|
||||
public static final String TYPE_FULL = "FULL";
|
||||
public static final String TYPE_INCREMENTAL = "INCREMENTAL";
|
||||
public static final String TYPE_DATASETS = "DATASETS";
|
||||
public static final String TYPE_FIELDS = "FIELDS";
|
||||
public static final String TYPE_USERS = "USERS";
|
||||
public static final String TYPE_ORGS = "ORGS";
|
||||
public static final String TYPE_WORKFLOWS = "WORKFLOWS";
|
||||
public static final String TYPE_JOBS = "JOBS";
|
||||
public static final String TYPE_LABEL_TASKS = "LABEL_TASKS";
|
||||
public static final String TYPE_KNOWLEDGE_SETS = "KNOWLEDGE_SETS";
|
||||
|
||||
@Id
|
||||
@GeneratedValue(UUIDStringGenerator.class)
|
||||
private String id;
|
||||
|
||||
@Property("sync_id")
|
||||
private String syncId;
|
||||
|
||||
@Property("graph_id")
|
||||
private String graphId;
|
||||
|
||||
/** 同步类型:FULL / DATASETS / WORKFLOWS 等 */
|
||||
@Property("sync_type")
|
||||
private String syncType;
|
||||
|
||||
/** 同步状态:SUCCESS / FAILED / PARTIAL */
|
||||
@Property("status")
|
||||
private String status;
|
||||
|
||||
@Property("started_at")
|
||||
private LocalDateTime startedAt;
|
||||
|
||||
@Property("completed_at")
|
||||
private LocalDateTime completedAt;
|
||||
|
||||
@Property("duration_millis")
|
||||
private long durationMillis;
|
||||
|
||||
@Property("total_created")
|
||||
@Builder.Default
|
||||
private int totalCreated = 0;
|
||||
|
||||
@Property("total_updated")
|
||||
@Builder.Default
|
||||
private int totalUpdated = 0;
|
||||
|
||||
@Property("total_skipped")
|
||||
@Builder.Default
|
||||
private int totalSkipped = 0;
|
||||
|
||||
@Property("total_failed")
|
||||
@Builder.Default
|
||||
private int totalFailed = 0;
|
||||
|
||||
@Property("total_purged")
|
||||
@Builder.Default
|
||||
private int totalPurged = 0;
|
||||
|
||||
/** 增量同步的时间窗口起始 */
|
||||
@Property("updated_from")
|
||||
private LocalDateTime updatedFrom;
|
||||
|
||||
/** 增量同步的时间窗口结束 */
|
||||
@Property("updated_to")
|
||||
private LocalDateTime updatedTo;
|
||||
|
||||
/** 同步失败时的错误信息 */
|
||||
@Property("error_message")
|
||||
private String errorMessage;
|
||||
|
||||
/** 各步骤的摘要,如 "Dataset(+5/~2/-0/purged:1)" */
|
||||
@Property("step_summaries")
|
||||
@Builder.Default
|
||||
private List<String> stepSummaries = new ArrayList<>();
|
||||
|
||||
/** 详细的各步骤结果(不持久化到 Neo4j,仅在返回时携带) */
|
||||
@Transient
|
||||
private List<SyncResult> results;
|
||||
|
||||
public int totalEntities() {
|
||||
return totalCreated + totalUpdated + totalSkipped + totalFailed;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 SyncResult 列表构建元数据。
|
||||
*/
|
||||
public static SyncMetadata fromResults(String syncId, String graphId, String syncType,
|
||||
LocalDateTime startedAt, List<SyncResult> results) {
|
||||
LocalDateTime completedAt = LocalDateTime.now();
|
||||
long duration = Duration.between(startedAt, completedAt).toMillis();
|
||||
|
||||
int created = 0, updated = 0, skipped = 0, failed = 0, purged = 0;
|
||||
List<String> summaries = new ArrayList<>();
|
||||
boolean hasFailures = false;
|
||||
|
||||
for (SyncResult r : results) {
|
||||
created += r.getCreated();
|
||||
updated += r.getUpdated();
|
||||
skipped += r.getSkipped();
|
||||
failed += r.getFailed();
|
||||
purged += r.getPurged();
|
||||
if (r.getFailed() > 0) {
|
||||
hasFailures = true;
|
||||
}
|
||||
summaries.add(formatStepSummary(r));
|
||||
}
|
||||
|
||||
String status = hasFailures ? STATUS_PARTIAL : STATUS_SUCCESS;
|
||||
|
||||
return SyncMetadata.builder()
|
||||
.syncId(syncId)
|
||||
.graphId(graphId)
|
||||
.syncType(syncType)
|
||||
.status(status)
|
||||
.startedAt(startedAt)
|
||||
.completedAt(completedAt)
|
||||
.durationMillis(duration)
|
||||
.totalCreated(created)
|
||||
.totalUpdated(updated)
|
||||
.totalSkipped(skipped)
|
||||
.totalFailed(failed)
|
||||
.totalPurged(purged)
|
||||
.stepSummaries(summaries)
|
||||
.results(results)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建失败的元数据。
|
||||
*/
|
||||
public static SyncMetadata failed(String syncId, String graphId, String syncType,
|
||||
LocalDateTime startedAt, String errorMessage) {
|
||||
LocalDateTime completedAt = LocalDateTime.now();
|
||||
long duration = Duration.between(startedAt, completedAt).toMillis();
|
||||
|
||||
return SyncMetadata.builder()
|
||||
.syncId(syncId)
|
||||
.graphId(graphId)
|
||||
.syncType(syncType)
|
||||
.status(STATUS_FAILED)
|
||||
.startedAt(startedAt)
|
||||
.completedAt(completedAt)
|
||||
.durationMillis(duration)
|
||||
.errorMessage(errorMessage)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static String formatStepSummary(SyncResult r) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(r.getSyncType())
|
||||
.append("(+").append(r.getCreated())
|
||||
.append("/~").append(r.getUpdated())
|
||||
.append("/-").append(r.getFailed());
|
||||
if (r.getPurged() > 0) {
|
||||
sb.append("/purged:").append(r.getPurged());
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
@@ -345,16 +345,13 @@ public class GraphRelationRepository {
|
||||
.query(
|
||||
"MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " +
|
||||
"MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " +
|
||||
"CREATE (s)-[r:" + REL_TYPE + " {" +
|
||||
" id: $id," +
|
||||
" relation_type: $relationType," +
|
||||
" weight: $weight," +
|
||||
" confidence: $confidence," +
|
||||
" source_id: $sourceId," +
|
||||
" graph_id: $graphId," +
|
||||
" properties_json: $propertiesJson," +
|
||||
" created_at: $createdAt" +
|
||||
"}]->(t) " +
|
||||
"MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " +
|
||||
"ON CREATE SET r.id = $id, r.weight = $weight, r.confidence = $confidence, " +
|
||||
" r.source_id = $sourceId, r.properties_json = $propertiesJson, r.created_at = $createdAt " +
|
||||
"ON MATCH SET r.weight = CASE WHEN $weight IS NOT NULL THEN $weight ELSE r.weight END, " +
|
||||
" r.confidence = CASE WHEN $confidence IS NOT NULL THEN $confidence ELSE r.confidence END, " +
|
||||
" r.source_id = CASE WHEN $sourceId <> '' THEN $sourceId ELSE r.source_id END, " +
|
||||
" r.properties_json = CASE WHEN $propertiesJson <> '{}' THEN $propertiesJson ELSE r.properties_json END " +
|
||||
RETURN_COLUMNS
|
||||
)
|
||||
.bindAll(params)
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.datamate.knowledgegraph.domain.repository;
|
||||
|
||||
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
|
||||
import org.springframework.data.neo4j.repository.Neo4jRepository;
|
||||
import org.springframework.data.neo4j.repository.query.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Repository
|
||||
public interface SyncHistoryRepository extends Neo4jRepository<SyncMetadata, String> {
|
||||
|
||||
@Query("MATCH (h:SyncHistory {graph_id: $graphId}) " +
|
||||
"RETURN h ORDER BY h.started_at DESC LIMIT $limit")
|
||||
List<SyncMetadata> findByGraphId(
|
||||
@Param("graphId") String graphId,
|
||||
@Param("limit") int limit);
|
||||
|
||||
@Query("MATCH (h:SyncHistory {graph_id: $graphId, status: $status}) " +
|
||||
"RETURN h ORDER BY h.started_at DESC LIMIT $limit")
|
||||
List<SyncMetadata> findByGraphIdAndStatus(
|
||||
@Param("graphId") String graphId,
|
||||
@Param("status") String status,
|
||||
@Param("limit") int limit);
|
||||
|
||||
@Query("MATCH (h:SyncHistory {graph_id: $graphId, sync_id: $syncId}) RETURN h")
|
||||
Optional<SyncMetadata> findByGraphIdAndSyncId(
|
||||
@Param("graphId") String graphId,
|
||||
@Param("syncId") String syncId);
|
||||
|
||||
@Query("MATCH (h:SyncHistory {graph_id: $graphId}) " +
|
||||
"WHERE h.started_at >= $from AND h.started_at <= $to " +
|
||||
"RETURN h ORDER BY h.started_at DESC SKIP $skip LIMIT $limit")
|
||||
List<SyncMetadata> findByGraphIdAndTimeRange(
|
||||
@Param("graphId") String graphId,
|
||||
@Param("from") LocalDateTime from,
|
||||
@Param("to") LocalDateTime to,
|
||||
@Param("skip") long skip,
|
||||
@Param("limit") int limit);
|
||||
}
|
||||
@@ -22,7 +22,8 @@ public enum KnowledgeGraphErrorCode implements ErrorCode {
|
||||
SYNC_FAILED("knowledge_graph.0009", "数据同步失败"),
|
||||
EMPTY_SNAPSHOT_PURGE_BLOCKED("knowledge_graph.0010", "空快照保护:上游返回空列表,已阻止 purge 操作"),
|
||||
SCHEMA_INIT_FAILED("knowledge_graph.0011", "图谱 Schema 初始化失败"),
|
||||
INSECURE_DEFAULT_CREDENTIALS("knowledge_graph.0012", "检测到默认凭据,生产环境禁止使用默认密码");
|
||||
INSECURE_DEFAULT_CREDENTIALS("knowledge_graph.0012", "检测到默认凭据,生产环境禁止使用默认密码"),
|
||||
UNAUTHORIZED_INTERNAL_CALL("knowledge_graph.0013", "内部调用未授权:X-Internal-Token 校验失败");
|
||||
|
||||
private final String code;
|
||||
private final String message;
|
||||
|
||||
@@ -72,7 +72,20 @@ public class GraphInitializer implements ApplicationRunner {
|
||||
"CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)",
|
||||
|
||||
// 全文索引
|
||||
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]"
|
||||
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]",
|
||||
|
||||
// ── SyncHistory 约束和索引 ──
|
||||
|
||||
// P1: syncId 唯一约束,防止 ID 碰撞
|
||||
"CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " +
|
||||
"FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE",
|
||||
|
||||
// P2-3: 查询优化索引
|
||||
"CREATE INDEX sync_history_graph_started IF NOT EXISTS " +
|
||||
"FOR (h:SyncHistory) ON (h.graph_id, h.started_at)",
|
||||
|
||||
"CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " +
|
||||
"FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)"
|
||||
);
|
||||
|
||||
@Override
|
||||
|
||||
@@ -25,6 +25,24 @@ public class KnowledgeGraphProperties {
|
||||
/** 同步相关配置 */
|
||||
private Sync sync = new Sync();
|
||||
|
||||
/** 安全相关配置 */
|
||||
private Security security = new Security();
|
||||
|
||||
@Data
|
||||
public static class Security {
|
||||
|
||||
/** 内部服务调用 Token,用于校验 sync 端点的 X-Internal-Token 请求头 */
|
||||
private String internalToken;
|
||||
|
||||
/**
|
||||
* 是否跳过内部 Token 校验(默认 false,即 fail-closed)。
|
||||
* <p>
|
||||
* 仅允许在 dev/test 环境显式设置为 true 以跳过校验。
|
||||
* 生产环境必须保持 false 并配置 {@code internal-token}。
|
||||
*/
|
||||
private boolean skipTokenCheck = false;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Sync {
|
||||
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
package com.datamate.knowledgegraph.infrastructure.security;
|
||||
|
||||
import com.datamate.common.infrastructure.common.Response;
|
||||
import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode;
|
||||
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.servlet.HandlerInterceptor;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* 内部服务调用 Token 校验拦截器。
|
||||
* <p>
|
||||
* 验证 {@code X-Internal-Token} 请求头,保护 sync 端点仅供内部服务/定时任务调用。
|
||||
* <p>
|
||||
* <strong>安全策略(fail-closed)</strong>:
|
||||
* <ul>
|
||||
* <li>Token 未配置且 {@code skip-token-check=false}(默认)时,直接拒绝请求</li>
|
||||
* <li>仅当 dev/test 环境显式设置 {@code skip-token-check=true} 时,才跳过校验</li>
|
||||
* </ul>
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class InternalTokenInterceptor implements HandlerInterceptor {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(InternalTokenInterceptor.class);
|
||||
private static final String HEADER_INTERNAL_TOKEN = "X-Internal-Token";
|
||||
|
||||
private final KnowledgeGraphProperties properties;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Override
|
||||
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
|
||||
throws IOException {
|
||||
KnowledgeGraphProperties.Security security = properties.getSecurity();
|
||||
String configuredToken = security.getInternalToken();
|
||||
|
||||
if (!StringUtils.hasText(configuredToken)) {
|
||||
if (security.isSkipTokenCheck()) {
|
||||
log.warn("内部调用 Token 未配置且 skip-token-check=true,跳过校验(仅限 dev/test 环境)。");
|
||||
return true;
|
||||
}
|
||||
log.error("内部调用 Token 未配置且 skip-token-check=false(fail-closed),拒绝请求。"
|
||||
+ "请设置 KG_INTERNAL_TOKEN 环境变量或在 dev/test 环境启用 skip-token-check。");
|
||||
writeErrorResponse(response);
|
||||
return false;
|
||||
}
|
||||
|
||||
String requestToken = request.getHeader(HEADER_INTERNAL_TOKEN);
|
||||
|
||||
if (!configuredToken.equals(requestToken)) {
|
||||
writeErrorResponse(response);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void writeErrorResponse(HttpServletResponse response) throws IOException {
|
||||
Response<?> errorBody = Response.error(KnowledgeGraphErrorCode.UNAUTHORIZED_INTERNAL_CALL);
|
||||
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
|
||||
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
|
||||
response.setCharacterEncoding("UTF-8");
|
||||
response.getWriter().write(objectMapper.writeValueAsString(errorBody));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.datamate.knowledgegraph.infrastructure.security;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
||||
|
||||
/**
|
||||
* 注册 {@link InternalTokenInterceptor},仅拦截 sync 端点。
|
||||
*/
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
public class InternalTokenWebMvcConfigurer implements WebMvcConfigurer {
|
||||
|
||||
private final InternalTokenInterceptor internalTokenInterceptor;
|
||||
|
||||
@Override
|
||||
public void addInterceptors(InterceptorRegistry registry) {
|
||||
registry.addInterceptor(internalTokenInterceptor)
|
||||
.addPathPatterns("/knowledge-graph/*/sync/**");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package com.datamate.knowledgegraph.interfaces.dto;
|
||||
|
||||
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 同步元数据视图对象。
|
||||
* <p>
|
||||
* 包含本次同步的整体统计信息和各步骤的详细结果。
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class SyncMetadataVO {
|
||||
|
||||
private String syncId;
|
||||
private String graphId;
|
||||
private String syncType;
|
||||
private String status;
|
||||
private LocalDateTime startedAt;
|
||||
private LocalDateTime completedAt;
|
||||
private long durationMillis;
|
||||
private int totalCreated;
|
||||
private int totalUpdated;
|
||||
private int totalSkipped;
|
||||
private int totalFailed;
|
||||
private int totalPurged;
|
||||
private int totalEntities;
|
||||
private LocalDateTime updatedFrom;
|
||||
private LocalDateTime updatedTo;
|
||||
private String errorMessage;
|
||||
private List<String> stepSummaries;
|
||||
/** 各步骤的详细结果(仅当前同步返回时携带,历史查询时为 null) */
|
||||
private List<SyncResultVO> results;
|
||||
|
||||
/**
|
||||
* 从 SyncMetadata 转换(包含详细步骤结果)。
|
||||
*/
|
||||
public static SyncMetadataVO from(SyncMetadata metadata) {
|
||||
List<SyncResultVO> resultVOs = null;
|
||||
if (metadata.getResults() != null) {
|
||||
resultVOs = metadata.getResults().stream()
|
||||
.map(SyncResultVO::from)
|
||||
.toList();
|
||||
}
|
||||
|
||||
return SyncMetadataVO.builder()
|
||||
.syncId(metadata.getSyncId())
|
||||
.graphId(metadata.getGraphId())
|
||||
.syncType(metadata.getSyncType())
|
||||
.status(metadata.getStatus())
|
||||
.startedAt(metadata.getStartedAt())
|
||||
.completedAt(metadata.getCompletedAt())
|
||||
.durationMillis(metadata.getDurationMillis())
|
||||
.totalCreated(metadata.getTotalCreated())
|
||||
.totalUpdated(metadata.getTotalUpdated())
|
||||
.totalSkipped(metadata.getTotalSkipped())
|
||||
.totalFailed(metadata.getTotalFailed())
|
||||
.totalPurged(metadata.getTotalPurged())
|
||||
.totalEntities(metadata.totalEntities())
|
||||
.updatedFrom(metadata.getUpdatedFrom())
|
||||
.updatedTo(metadata.getUpdatedTo())
|
||||
.errorMessage(metadata.getErrorMessage())
|
||||
.stepSummaries(metadata.getStepSummaries())
|
||||
.results(resultVOs)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,20 @@
|
||||
package com.datamate.knowledgegraph.interfaces.rest;
|
||||
|
||||
import com.datamate.knowledgegraph.application.GraphSyncService;
|
||||
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
|
||||
import com.datamate.knowledgegraph.domain.model.SyncResult;
|
||||
import com.datamate.knowledgegraph.interfaces.dto.SyncMetadataVO;
|
||||
import com.datamate.knowledgegraph.interfaces.dto.SyncResultVO;
|
||||
import jakarta.validation.constraints.Max;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import jakarta.validation.constraints.Pattern;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.format.annotation.DateTimeFormat;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -16,10 +23,13 @@ import java.util.List;
|
||||
* 提供手动触发 MySQL → Neo4j 同步的 REST 端点。
|
||||
* 生产环境中也可通过定时任务自动触发。
|
||||
* <p>
|
||||
* <b>安全说明</b>:本接口仅供内部服务调用(API Gateway / 定时任务),
|
||||
* 外部请求必须经 API Gateway 鉴权后转发。
|
||||
* 生产环境建议通过 mTLS 或内部 JWT 进一步加固服务间认证。
|
||||
* 当前通过 {@code X-Internal-Token} 请求头进行简单的内部调用校验。
|
||||
* <b>安全架构</b>:
|
||||
* <ul>
|
||||
* <li>外部请求 → API Gateway (JWT 校验) → X-User-* headers → 后端服务</li>
|
||||
* <li>内部调用 → X-Internal-Token header → {@code InternalTokenInterceptor} 校验 → sync 端点</li>
|
||||
* </ul>
|
||||
* Token 校验由 {@code InternalTokenInterceptor} 拦截器统一实现,
|
||||
* 对 {@code /knowledge-graph/{graphId}/sync/} 路径前缀自动生效。
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/knowledge-graph/{graphId}/sync")
|
||||
@@ -36,10 +46,22 @@ public class GraphSyncController {
|
||||
* 全量同步:拉取所有实体并构建关系。
|
||||
*/
|
||||
@PostMapping("/full")
|
||||
public List<SyncResultVO> syncAll(
|
||||
public SyncMetadataVO syncAll(
|
||||
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
|
||||
List<SyncResult> results = syncService.syncAll(graphId);
|
||||
return results.stream().map(SyncResultVO::from).toList();
|
||||
SyncMetadata metadata = syncService.syncAll(graphId);
|
||||
return SyncMetadataVO.from(metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* 增量同步:仅拉取指定时间窗口内变更的数据并同步。
|
||||
*/
|
||||
@PostMapping("/incremental")
|
||||
public SyncMetadataVO syncIncremental(
|
||||
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
|
||||
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedFrom,
|
||||
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime updatedTo) {
|
||||
SyncMetadata metadata = syncService.syncIncremental(graphId, updatedFrom, updatedTo);
|
||||
return SyncMetadataVO.from(metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -211,4 +233,50 @@ public class GraphSyncController {
|
||||
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
|
||||
return SyncResultVO.from(syncService.buildSourcedFromRelations(graphId));
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 同步历史查询端点
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* 查询同步历史记录。
|
||||
*
|
||||
* @param status 可选,按状态过滤(SUCCESS / FAILED / PARTIAL)
|
||||
* @param limit 返回条数上限,默认 20
|
||||
*/
|
||||
@GetMapping("/history")
|
||||
public List<SyncMetadataVO> getSyncHistory(
|
||||
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
|
||||
@RequestParam(required = false) String status,
|
||||
@RequestParam(defaultValue = "20") @Min(1) @Max(200) int limit) {
|
||||
List<SyncMetadata> history = syncService.getSyncHistory(graphId, status, limit);
|
||||
return history.stream().map(SyncMetadataVO::from).toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 按时间范围查询同步历史。
|
||||
*/
|
||||
@GetMapping("/history/range")
|
||||
public List<SyncMetadataVO> getSyncHistoryByTimeRange(
|
||||
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
|
||||
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime from,
|
||||
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime to,
|
||||
@RequestParam(defaultValue = "0") @Min(0) @Max(10000) int page,
|
||||
@RequestParam(defaultValue = "20") @Min(1) @Max(200) int size) {
|
||||
List<SyncMetadata> history = syncService.getSyncHistoryByTimeRange(graphId, from, to, page, size);
|
||||
return history.stream().map(SyncMetadataVO::from).toList();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 syncId 查询单条同步记录。
|
||||
*/
|
||||
@GetMapping("/history/{syncId}")
|
||||
public ResponseEntity<SyncMetadataVO> getSyncRecord(
|
||||
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
|
||||
@PathVariable String syncId) {
|
||||
return syncService.getSyncRecord(graphId, syncId)
|
||||
.map(SyncMetadataVO::from)
|
||||
.map(ResponseEntity::ok)
|
||||
.orElse(ResponseEntity.notFound().build());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,14 @@ datamate:
|
||||
max-nodes-per-query: ${KG_MAX_NODES:500}
|
||||
# 批量导入批次大小
|
||||
import-batch-size: ${KG_IMPORT_BATCH_SIZE:100}
|
||||
# 安全配置
|
||||
security:
|
||||
# 内部服务调用 Token(用于 sync 端点的 X-Internal-Token 校验)
|
||||
# 生产环境务必通过 KG_INTERNAL_TOKEN 环境变量设置,否则 sync 端点将拒绝所有请求(fail-closed)
|
||||
internal-token: ${KG_INTERNAL_TOKEN:}
|
||||
# 是否跳过 Token 校验(默认 false = fail-closed)
|
||||
# 仅在 dev/test 环境显式设置为 true 以跳过校验
|
||||
skip-token-check: ${KG_SKIP_TOKEN_CHECK:false}
|
||||
# MySQL → Neo4j 同步配置
|
||||
sync:
|
||||
# 数据管理服务地址
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.datamate.knowledgegraph.application;
|
||||
|
||||
import com.datamate.common.infrastructure.exception.BusinessException;
|
||||
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
|
||||
import com.datamate.knowledgegraph.domain.model.SyncResult;
|
||||
import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository;
|
||||
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
|
||||
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
|
||||
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
|
||||
@@ -13,12 +15,15 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -42,6 +47,9 @@ class GraphSyncServiceTest {
|
||||
@Mock
|
||||
private KnowledgeGraphProperties properties;
|
||||
|
||||
@Mock
|
||||
private SyncHistoryRepository syncHistoryRepository;
|
||||
|
||||
@InjectMocks
|
||||
private GraphSyncService syncService;
|
||||
|
||||
@@ -161,7 +169,7 @@ class GraphSyncServiceTest {
|
||||
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
|
||||
|
||||
List<SyncResult> results = syncService.syncAll(GRAPH_ID);
|
||||
List<SyncResult> results = syncService.syncAll(GRAPH_ID).getResults();
|
||||
|
||||
// 8 entities + 10 relations = 18
|
||||
assertThat(results).hasSize(18);
|
||||
@@ -335,4 +343,481 @@ class GraphSyncServiceTest {
|
||||
.isInstanceOf(BusinessException.class);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 同步元数据记录
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Nested
|
||||
class SyncMetadataRecordingTest {
|
||||
|
||||
@Test
|
||||
void syncAll_success_recordsMetadataWithCorrectFields() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
|
||||
DatasetDTO dto = new DatasetDTO();
|
||||
dto.setId("ds-001");
|
||||
dto.setName("Test");
|
||||
dto.setCreatedBy("admin");
|
||||
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
|
||||
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
|
||||
|
||||
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Dataset").created(3).updated(1).build());
|
||||
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Job").build());
|
||||
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
|
||||
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
|
||||
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
|
||||
.thenReturn(0);
|
||||
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
|
||||
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
|
||||
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
|
||||
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
|
||||
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
|
||||
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
|
||||
|
||||
SyncMetadata metadata = syncService.syncAll(GRAPH_ID);
|
||||
|
||||
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
|
||||
assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL);
|
||||
assertThat(metadata.getGraphId()).isEqualTo(GRAPH_ID);
|
||||
assertThat(metadata.getSyncId()).isNotNull();
|
||||
assertThat(metadata.getStartedAt()).isNotNull();
|
||||
assertThat(metadata.getCompletedAt()).isNotNull();
|
||||
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
|
||||
assertThat(metadata.getTotalCreated()).isEqualTo(3);
|
||||
assertThat(metadata.getTotalUpdated()).isEqualTo(1);
|
||||
assertThat(metadata.getResults()).hasSize(18);
|
||||
assertThat(metadata.getStepSummaries()).hasSize(18);
|
||||
assertThat(metadata.getErrorMessage()).isNull();
|
||||
|
||||
// 验证持久化被调用
|
||||
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
|
||||
verify(syncHistoryRepository).save(captor.capture());
|
||||
SyncMetadata saved = captor.getValue();
|
||||
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
|
||||
assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID);
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncAll_withFailedSteps_recordsPartialStatus() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
|
||||
DatasetDTO dto = new DatasetDTO();
|
||||
dto.setId("ds-001");
|
||||
dto.setName("Test");
|
||||
dto.setCreatedBy("admin");
|
||||
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
|
||||
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
|
||||
|
||||
// Dataset step has failures
|
||||
SyncResult datasetResult = SyncResult.builder().syncType("Dataset").created(2).failed(1).build();
|
||||
datasetResult.setErrors(new java.util.ArrayList<>(List.of("some error")));
|
||||
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(datasetResult);
|
||||
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Job").build());
|
||||
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
|
||||
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
|
||||
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
|
||||
.thenReturn(0);
|
||||
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
|
||||
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
|
||||
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
|
||||
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
|
||||
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
|
||||
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
|
||||
|
||||
SyncMetadata metadata = syncService.syncAll(GRAPH_ID);
|
||||
|
||||
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL);
|
||||
assertThat(metadata.getTotalFailed()).isEqualTo(1);
|
||||
assertThat(metadata.getTotalCreated()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncAll_exceptionThrown_recordsFailedMetadata() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("connection refused"));
|
||||
|
||||
assertThatThrownBy(() -> syncService.syncAll(GRAPH_ID))
|
||||
.isInstanceOf(BusinessException.class);
|
||||
|
||||
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
|
||||
verify(syncHistoryRepository).save(captor.capture());
|
||||
SyncMetadata saved = captor.getValue();
|
||||
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
|
||||
assertThat(saved.getErrorMessage()).isNotNull();
|
||||
assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID);
|
||||
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL);
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncDatasets_success_recordsMetadata() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
|
||||
DatasetDTO dto = new DatasetDTO();
|
||||
dto.setId("ds-001");
|
||||
dto.setName("Test");
|
||||
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
|
||||
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Dataset").created(1).build());
|
||||
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString()))
|
||||
.thenReturn(0);
|
||||
|
||||
syncService.syncDatasets(GRAPH_ID);
|
||||
|
||||
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
|
||||
verify(syncHistoryRepository).save(captor.capture());
|
||||
SyncMetadata saved = captor.getValue();
|
||||
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
|
||||
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS);
|
||||
assertThat(saved.getTotalCreated()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncDatasets_failed_recordsFailedMetadata() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("timeout"));
|
||||
|
||||
assertThatThrownBy(() -> syncService.syncDatasets(GRAPH_ID))
|
||||
.isInstanceOf(BusinessException.class);
|
||||
|
||||
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
|
||||
verify(syncHistoryRepository).save(captor.capture());
|
||||
SyncMetadata saved = captor.getValue();
|
||||
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
|
||||
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS);
|
||||
}
|
||||
|
||||
@Test
|
||||
void saveSyncHistory_exceptionInSave_doesNotAffectMainFlow() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
|
||||
DatasetDTO dto = new DatasetDTO();
|
||||
dto.setId("ds-001");
|
||||
dto.setName("Test");
|
||||
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
|
||||
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Dataset").build());
|
||||
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString()))
|
||||
.thenReturn(0);
|
||||
// saveSyncHistory 内部异常不应影响主流程
|
||||
when(syncHistoryRepository.save(any())).thenThrow(new RuntimeException("Neo4j down"));
|
||||
|
||||
SyncResult result = syncService.syncDatasets(GRAPH_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("Dataset");
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 增量同步
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Nested
|
||||
class IncrementalSyncTest {
|
||||
|
||||
private final LocalDateTime UPDATED_FROM = LocalDateTime.of(2025, 6, 1, 0, 0);
|
||||
private final LocalDateTime UPDATED_TO = LocalDateTime.of(2025, 6, 30, 23, 59);
|
||||
|
||||
@Test
|
||||
void syncIncremental_invalidGraphId_throwsBusinessException() {
|
||||
assertThatThrownBy(() -> syncService.syncIncremental(INVALID_GRAPH_ID, UPDATED_FROM, UPDATED_TO))
|
||||
.isInstanceOf(BusinessException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncIncremental_nullUpdatedFrom_throwsBusinessException() {
|
||||
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, null, UPDATED_TO))
|
||||
.isInstanceOf(BusinessException.class)
|
||||
.hasMessageContaining("updatedFrom");
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncIncremental_nullUpdatedTo_throwsBusinessException() {
|
||||
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, null))
|
||||
.isInstanceOf(BusinessException.class)
|
||||
.hasMessageContaining("updatedTo");
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncIncremental_fromAfterTo_throwsBusinessException() {
|
||||
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_TO, UPDATED_FROM))
|
||||
.isInstanceOf(BusinessException.class)
|
||||
.hasMessageContaining("updatedFrom");
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncIncremental_success_passesTimeWindowToClient() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
|
||||
DatasetDTO dto = new DatasetDTO();
|
||||
dto.setId("ds-001");
|
||||
dto.setName("Test");
|
||||
dto.setCreatedBy("admin");
|
||||
when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of(dto));
|
||||
when(dataManagementClient.listAllWorkflows(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllJobs(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllLabelTasks(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO)).thenReturn(List.of());
|
||||
|
||||
stubAllEntityUpserts();
|
||||
stubAllRelationMerges();
|
||||
|
||||
SyncMetadata metadata = syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO);
|
||||
|
||||
assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL);
|
||||
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
|
||||
assertThat(metadata.getUpdatedFrom()).isEqualTo(UPDATED_FROM);
|
||||
assertThat(metadata.getUpdatedTo()).isEqualTo(UPDATED_TO);
|
||||
assertThat(metadata.getResults()).hasSize(18);
|
||||
|
||||
// 验证使用了带时间窗口的 client 方法
|
||||
verify(dataManagementClient).listAllDatasets(UPDATED_FROM, UPDATED_TO);
|
||||
verify(dataManagementClient).listAllWorkflows(UPDATED_FROM, UPDATED_TO);
|
||||
verify(dataManagementClient).listAllJobs(UPDATED_FROM, UPDATED_TO);
|
||||
verify(dataManagementClient).listAllLabelTasks(UPDATED_FROM, UPDATED_TO);
|
||||
verify(dataManagementClient).listAllKnowledgeSets(UPDATED_FROM, UPDATED_TO);
|
||||
|
||||
// 验证不执行 purge
|
||||
verify(stepService, never()).purgeStaleEntities(anyString(), anyString(), anySet(), anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncIncremental_failure_recordsMetadataWithTimeWindow() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
when(dataManagementClient.listAllDatasets(UPDATED_FROM, UPDATED_TO))
|
||||
.thenThrow(new RuntimeException("connection refused"));
|
||||
|
||||
assertThatThrownBy(() -> syncService.syncIncremental(GRAPH_ID, UPDATED_FROM, UPDATED_TO))
|
||||
.isInstanceOf(BusinessException.class);
|
||||
|
||||
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
|
||||
verify(syncHistoryRepository).save(captor.capture());
|
||||
SyncMetadata saved = captor.getValue();
|
||||
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
|
||||
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_INCREMENTAL);
|
||||
assertThat(saved.getUpdatedFrom()).isEqualTo(UPDATED_FROM);
|
||||
assertThat(saved.getUpdatedTo()).isEqualTo(UPDATED_TO);
|
||||
}
|
||||
|
||||
private void stubAllEntityUpserts() {
|
||||
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Dataset").build());
|
||||
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Job").build());
|
||||
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
|
||||
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
|
||||
}
|
||||
|
||||
private void stubAllRelationMerges() {
|
||||
// 2-参数版本(全量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误
|
||||
lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
|
||||
lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
|
||||
lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
|
||||
lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
|
||||
lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
|
||||
lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
|
||||
|
||||
// 3-参数版本(增量同步)- 使用 lenient 模式避免 unnecessary stubbing 错误
|
||||
lenient().when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
lenient().when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
|
||||
lenient().when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
|
||||
lenient().when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
|
||||
lenient().when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
|
||||
lenient().when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
|
||||
lenient().when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 同步历史查询
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Nested
|
||||
class SyncHistoryQueryTest {
|
||||
|
||||
@Test
|
||||
void getSyncHistory_invalidGraphId_throwsBusinessException() {
|
||||
assertThatThrownBy(() -> syncService.getSyncHistory(INVALID_GRAPH_ID, null, 20))
|
||||
.isInstanceOf(BusinessException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSyncHistory_noStatusFilter_callsFindByGraphId() {
|
||||
when(syncHistoryRepository.findByGraphId(GRAPH_ID, 20)).thenReturn(List.of());
|
||||
|
||||
List<SyncMetadata> result = syncService.getSyncHistory(GRAPH_ID, null, 20);
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
verify(syncHistoryRepository).findByGraphId(GRAPH_ID, 20);
|
||||
verify(syncHistoryRepository, never()).findByGraphIdAndStatus(anyString(), anyString(), anyInt());
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSyncHistory_withStatusFilter_callsFindByGraphIdAndStatus() {
|
||||
when(syncHistoryRepository.findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10))
|
||||
.thenReturn(List.of());
|
||||
|
||||
List<SyncMetadata> result = syncService.getSyncHistory(GRAPH_ID, "SUCCESS", 10);
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
verify(syncHistoryRepository).findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSyncRecord_found_returnsRecord() {
|
||||
SyncMetadata expected = SyncMetadata.builder()
|
||||
.syncId("abc12345").graphId(GRAPH_ID).build();
|
||||
when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "abc12345"))
|
||||
.thenReturn(Optional.of(expected));
|
||||
|
||||
Optional<SyncMetadata> result = syncService.getSyncRecord(GRAPH_ID, "abc12345");
|
||||
|
||||
assertThat(result).isPresent();
|
||||
assertThat(result.get().getSyncId()).isEqualTo("abc12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSyncRecord_notFound_returnsEmpty() {
|
||||
when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "notexist"))
|
||||
.thenReturn(Optional.empty());
|
||||
|
||||
Optional<SyncMetadata> result = syncService.getSyncRecord(GRAPH_ID, "notexist");
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSyncHistoryByTimeRange_delegatesToRepository() {
|
||||
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0);
|
||||
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59);
|
||||
when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20))
|
||||
.thenReturn(List.of());
|
||||
|
||||
List<SyncMetadata> result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 0, 20);
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSyncHistoryByTimeRange_pagination_computesSkipCorrectly() {
|
||||
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0);
|
||||
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59);
|
||||
when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20))
|
||||
.thenReturn(List.of());
|
||||
|
||||
List<SyncMetadata> result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 2, 20);
|
||||
|
||||
assertThat(result).isEmpty();
|
||||
// page=2, size=20 → skip=40
|
||||
verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20);
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSyncHistoryByTimeRange_skipOverflow_throwsBusinessException() {
|
||||
// 模拟绕过 Controller 校验直接调用 Service 的场景
|
||||
assertThatThrownBy(() -> syncService.getSyncHistoryByTimeRange(
|
||||
GRAPH_ID,
|
||||
LocalDateTime.of(2025, 1, 1, 0, 0),
|
||||
LocalDateTime.of(2025, 12, 31, 23, 59),
|
||||
20000, 200))
|
||||
.isInstanceOf(BusinessException.class)
|
||||
.hasMessageContaining("分页偏移量");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -749,14 +749,129 @@ class GraphSyncStepServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeImpacts_returnsPlaceholderResult() {
|
||||
void mergeImpacts_noFields_returnsZero() {
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field")).thenReturn(List.of());
|
||||
|
||||
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
|
||||
assertThat(result.getCreated()).isEqualTo(0);
|
||||
assertThat(result.isPlaceholder()).isTrue();
|
||||
assertThat(result.isPlaceholder()).isFalse();
|
||||
verifyNoInteractions(neo4jClient);
|
||||
verifyNoInteractions(entityRepository);
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeImpacts_derivedFrom_matchingFieldNames_createsRelation() {
|
||||
setupNeo4jQueryChain(String.class, "new-rel-id");
|
||||
|
||||
// Parent dataset (source_id = "ds-parent")
|
||||
GraphEntity parentDs = GraphEntity.builder()
|
||||
.id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>())
|
||||
.build();
|
||||
// Child dataset (source_id = "ds-child", parent_dataset_id = "ds-parent")
|
||||
GraphEntity childDs = GraphEntity.builder()
|
||||
.id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent")))
|
||||
.build();
|
||||
|
||||
// Fields with matching name "user_id"
|
||||
GraphEntity parentField = GraphEntity.builder()
|
||||
.id("field-parent-uid").name("user_id").type("Field").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent")))
|
||||
.build();
|
||||
GraphEntity childField = GraphEntity.builder()
|
||||
.id("field-child-uid").name("user_id").type("Field").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-child")))
|
||||
.build();
|
||||
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
|
||||
.thenReturn(List.of(parentField, childField));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
|
||||
.thenReturn(List.of(parentDs, childDs));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
|
||||
.thenReturn(List.of());
|
||||
|
||||
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
|
||||
verify(neo4jClient).query(cypherCaptor.capture());
|
||||
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeImpacts_noMatchingFieldNames_createsNoRelation() {
|
||||
GraphEntity parentDs = GraphEntity.builder()
|
||||
.id("parent-entity").sourceId("ds-parent").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>())
|
||||
.build();
|
||||
GraphEntity childDs = GraphEntity.builder()
|
||||
.id("child-entity").sourceId("ds-child").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("parent_dataset_id", "ds-parent")))
|
||||
.build();
|
||||
|
||||
GraphEntity parentField = GraphEntity.builder()
|
||||
.id("field-parent").name("col_a").type("Field").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-parent")))
|
||||
.build();
|
||||
GraphEntity childField = GraphEntity.builder()
|
||||
.id("field-child").name("col_b").type("Field").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-child")))
|
||||
.build();
|
||||
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
|
||||
.thenReturn(List.of(parentField, childField));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
|
||||
.thenReturn(List.of(parentDs, childDs));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
|
||||
.thenReturn(List.of());
|
||||
|
||||
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
|
||||
|
||||
assertThat(result.getCreated()).isEqualTo(0);
|
||||
verifyNoInteractions(neo4jClient);
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeImpacts_jobInputOutput_createsRelationWithJobId() {
|
||||
setupNeo4jQueryChain(String.class, "new-rel-id");
|
||||
|
||||
GraphEntity inputDs = GraphEntity.builder()
|
||||
.id("input-entity").sourceId("ds-in").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>())
|
||||
.build();
|
||||
GraphEntity outputDs = GraphEntity.builder()
|
||||
.id("output-entity").sourceId("ds-out").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>())
|
||||
.build();
|
||||
GraphEntity job = GraphEntity.builder()
|
||||
.id("job-entity").sourceId("job-001").type("Job").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of(
|
||||
"input_dataset_id", "ds-in",
|
||||
"output_dataset_id", "ds-out")))
|
||||
.build();
|
||||
|
||||
GraphEntity inField = GraphEntity.builder()
|
||||
.id("field-in").name("tag_x").type("Field").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-in")))
|
||||
.build();
|
||||
GraphEntity outField = GraphEntity.builder()
|
||||
.id("field-out").name("tag_x").type("Field").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("dataset_source_id", "ds-out")))
|
||||
.build();
|
||||
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Field"))
|
||||
.thenReturn(List.of(inField, outField));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
|
||||
.thenReturn(List.of(inputDs, outputDs));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Job"))
|
||||
.thenReturn(List.of(job));
|
||||
|
||||
SyncResult result = stepService.mergeImpactsRelations(GRAPH_ID, SYNC_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("IMPACTS");
|
||||
verify(neo4jClient).query(cypherCaptor.capture());
|
||||
assertThat(cypherCaptor.getValue()).contains("RELATED_TO");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
package com.datamate.knowledgegraph.domain.model;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class SyncMetadataTest {
|
||||
|
||||
@Test
|
||||
void fromResults_aggregatesCountsCorrectly() {
|
||||
LocalDateTime startedAt = LocalDateTime.of(2025, 6, 1, 10, 0, 0);
|
||||
|
||||
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(1).purged(3).build();
|
||||
SyncResult r2 = SyncResult.builder().syncType("Field").created(10).updated(0).skipped(2).build();
|
||||
SyncResult r3 = SyncResult.builder().syncType("HAS_FIELD").created(8).build();
|
||||
|
||||
SyncMetadata metadata = SyncMetadata.fromResults("abc123", "graph-id", "FULL", startedAt, List.of(r1, r2, r3));
|
||||
|
||||
assertThat(metadata.getSyncId()).isEqualTo("abc123");
|
||||
assertThat(metadata.getGraphId()).isEqualTo("graph-id");
|
||||
assertThat(metadata.getSyncType()).isEqualTo("FULL");
|
||||
assertThat(metadata.getTotalCreated()).isEqualTo(23); // 5 + 10 + 8
|
||||
assertThat(metadata.getTotalUpdated()).isEqualTo(2); // 2 + 0 + 0
|
||||
assertThat(metadata.getTotalSkipped()).isEqualTo(2); // 0 + 2 + 0
|
||||
assertThat(metadata.getTotalFailed()).isEqualTo(1); // 1 + 0 + 0
|
||||
assertThat(metadata.getTotalPurged()).isEqualTo(3); // 3 + 0 + 0
|
||||
assertThat(metadata.getStartedAt()).isEqualTo(startedAt);
|
||||
assertThat(metadata.getCompletedAt()).isNotNull();
|
||||
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
|
||||
assertThat(metadata.getResults()).hasSize(3);
|
||||
assertThat(metadata.getStepSummaries()).hasSize(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fromResults_noFailures_statusIsSuccess() {
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).build();
|
||||
|
||||
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1));
|
||||
|
||||
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fromResults_withFailures_statusIsPartial() {
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).failed(2).build();
|
||||
|
||||
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1));
|
||||
|
||||
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL);
|
||||
assertThat(metadata.getTotalFailed()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void failed_createsFailedMetadata() {
|
||||
LocalDateTime startedAt = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
|
||||
|
||||
SyncMetadata metadata = SyncMetadata.failed("abc", "g1", "FULL", startedAt, "connection refused");
|
||||
|
||||
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
|
||||
assertThat(metadata.getErrorMessage()).isEqualTo("connection refused");
|
||||
assertThat(metadata.getSyncId()).isEqualTo("abc");
|
||||
assertThat(metadata.getGraphId()).isEqualTo("g1");
|
||||
assertThat(metadata.getSyncType()).isEqualTo("FULL");
|
||||
assertThat(metadata.getStartedAt()).isEqualTo(startedAt);
|
||||
assertThat(metadata.getCompletedAt()).isNotNull();
|
||||
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
|
||||
assertThat(metadata.getTotalCreated()).isEqualTo(0);
|
||||
assertThat(metadata.getTotalUpdated()).isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void totalEntities_returnsSum() {
|
||||
SyncMetadata metadata = SyncMetadata.builder()
|
||||
.totalCreated(10).totalUpdated(5).totalSkipped(3).totalFailed(2)
|
||||
.build();
|
||||
|
||||
assertThat(metadata.totalEntities()).isEqualTo(20);
|
||||
}
|
||||
|
||||
@Test
|
||||
void stepSummaries_formatIncludesPurgedWhenNonZero() {
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(0).purged(3).build();
|
||||
SyncResult r2 = SyncResult.builder().syncType("Field").created(1).updated(0).failed(0).purged(0).build();
|
||||
|
||||
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1, r2));
|
||||
|
||||
assertThat(metadata.getStepSummaries().get(0)).isEqualTo("Dataset(+5/~2/-0/purged:3)");
|
||||
assertThat(metadata.getStepSummaries().get(1)).isEqualTo("Field(+1/~0/-0)");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package com.datamate.knowledgegraph.infrastructure.security;
|
||||
|
||||
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.mock.web.MockHttpServletRequest;
|
||||
import org.springframework.mock.web.MockHttpServletResponse;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class InternalTokenInterceptorTest {
|
||||
|
||||
private static final String VALID_TOKEN = "test-secret-token";
|
||||
|
||||
private KnowledgeGraphProperties properties;
|
||||
private InternalTokenInterceptor interceptor;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
properties = new KnowledgeGraphProperties();
|
||||
interceptor = new InternalTokenInterceptor(properties, new ObjectMapper());
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// fail-closed:Token 未配置 + skipTokenCheck=false → 拒绝
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Test
|
||||
void tokenNotConfigured_skipFalse_rejects() throws Exception {
|
||||
properties.getSecurity().setInternalToken(null);
|
||||
properties.getSecurity().setSkipTokenCheck(false);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
|
||||
boolean result = interceptor.preHandle(request, response, new Object());
|
||||
|
||||
assertThat(result).isFalse();
|
||||
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
|
||||
assertThat(response.getContentAsString()).contains("knowledge_graph.0013");
|
||||
}
|
||||
|
||||
@Test
|
||||
void tokenEmpty_skipFalse_rejects() throws Exception {
|
||||
properties.getSecurity().setInternalToken("");
|
||||
properties.getSecurity().setSkipTokenCheck(false);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
|
||||
boolean result = interceptor.preHandle(request, response, new Object());
|
||||
|
||||
assertThat(result).isFalse();
|
||||
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// dev/test 放行:Token 未配置 + skipTokenCheck=true → 放行
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Test
|
||||
void tokenNotConfigured_skipTrue_allows() throws Exception {
|
||||
properties.getSecurity().setInternalToken(null);
|
||||
properties.getSecurity().setSkipTokenCheck(true);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
|
||||
boolean result = interceptor.preHandle(request, response, new Object());
|
||||
|
||||
assertThat(result).isTrue();
|
||||
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_OK);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 正常校验:Token 已配置 + 请求头匹配 → 放行
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Test
|
||||
void validToken_allows() throws Exception {
|
||||
properties.getSecurity().setInternalToken(VALID_TOKEN);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
request.addHeader("X-Internal-Token", VALID_TOKEN);
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
|
||||
boolean result = interceptor.preHandle(request, response, new Object());
|
||||
|
||||
assertThat(result).isTrue();
|
||||
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_OK);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 401:Token 已配置 + 请求头不匹配 → 拒绝
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Test
|
||||
void invalidToken_rejects() throws Exception {
|
||||
properties.getSecurity().setInternalToken(VALID_TOKEN);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
request.addHeader("X-Internal-Token", "wrong-token");
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
|
||||
boolean result = interceptor.preHandle(request, response, new Object());
|
||||
|
||||
assertThat(result).isFalse();
|
||||
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
|
||||
assertThat(response.getContentType()).startsWith("application/json");
|
||||
assertThat(response.getContentAsString()).contains("knowledge_graph.0013");
|
||||
}
|
||||
|
||||
@Test
|
||||
void missingTokenHeader_rejects() throws Exception {
|
||||
properties.getSecurity().setInternalToken(VALID_TOKEN);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
// No X-Internal-Token header
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
|
||||
boolean result = interceptor.preHandle(request, response, new Object());
|
||||
|
||||
assertThat(result).isFalse();
|
||||
assertThat(response.getStatus()).isEqualTo(HttpServletResponse.SC_UNAUTHORIZED);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 错误响应格式:应使用 Response 体系
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Test
|
||||
void errorResponse_usesResponseFormat() throws Exception {
|
||||
properties.getSecurity().setInternalToken(VALID_TOKEN);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest();
|
||||
request.addHeader("X-Internal-Token", "wrong");
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
|
||||
interceptor.preHandle(request, response, new Object());
|
||||
|
||||
String body = response.getContentAsString();
|
||||
assertThat(body).contains("\"code\"");
|
||||
assertThat(body).contains("\"message\"");
|
||||
// Response.error() 包含 data 字段(值为 null)
|
||||
assertThat(body).contains("\"data\"");
|
||||
}
|
||||
}
|
||||
@@ -7,8 +7,14 @@ import org.springframework.security.config.annotation.web.configuration.EnableWe
|
||||
import org.springframework.security.web.SecurityFilterChain;
|
||||
|
||||
/**
|
||||
* 安全配置 - 暂时禁用所有认证
|
||||
* 开发阶段使用,生产环境需要启用认证
|
||||
* Spring Security 配置。
|
||||
* <p>
|
||||
* 安全架构采用双层防护:
|
||||
* <ul>
|
||||
* <li><b>Gateway 层</b>:API Gateway 负责 JWT 校验,通过后透传 X-User-* headers 到后端服务</li>
|
||||
* <li><b>服务层</b>:内部 sync 端点通过 {@code InternalTokenInterceptor} 校验 X-Internal-Token</li>
|
||||
* </ul>
|
||||
* 当前 SecurityFilterChain 配置为 permitAll,HTTP 级别的访问控制由 Gateway 和业务拦截器共同完成。
|
||||
*/
|
||||
@Configuration
|
||||
@EnableWebSecurity
|
||||
|
||||
@@ -3,12 +3,6 @@ spring:
|
||||
application:
|
||||
name: datamate
|
||||
|
||||
# 暂时排除Spring Security自动配置(开发阶段使用)
|
||||
autoconfigure:
|
||||
exclude:
|
||||
- org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
|
||||
- org.springframework.boot.autoconfigure.security.servlet.UserDetailsServiceAutoConfiguration
|
||||
|
||||
# 数据源配置
|
||||
datasource:
|
||||
driver-class-name: com.mysql.cj.jdbc.Driver
|
||||
|
||||
Reference in New Issue
Block a user