diff --git a/.gitignore b/.gitignore index c0906e8..dcf5d0d 100644 --- a/.gitignore +++ b/.gitignore @@ -189,4 +189,6 @@ Thumbs.db *.sublime-workspace # Milvus -deployment/docker/milvus/volumes/ \ No newline at end of file +deployment/docker/milvus/volumes/ +# Local documentation +docs/knowledge-graph/ diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java index 0323fc8..ef7693b 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java @@ -2,7 +2,9 @@ package com.datamate.knowledgegraph.application; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.knowledgegraph.domain.model.SyncMetadata; import com.datamate.knowledgegraph.domain.model.SyncResult; +import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; @@ -15,6 +17,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -52,6 +55,7 @@ public class GraphSyncService { private final GraphSyncStepService stepService; private final DataManagementClient dataManagementClient; private final KnowledgeGraphProperties properties; + private final SyncHistoryRepository syncHistoryRepository; /** 同 graphId 互斥锁,防止并发同步。 */ private final ConcurrentHashMap graphLocks = new ConcurrentHashMap<>(); @@ -60,9 +64,10 @@ public class GraphSyncService { // 全量同步 // ----------------------------------------------------------------------- - public List syncAll(String graphId) { + public SyncMetadata syncAll(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { @@ -178,10 +183,16 @@ public class GraphSyncService { results.stream() .map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")") .collect(Collectors.joining(", "))); - return results; + + SyncMetadata metadata = SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, results); + saveSyncHistory(metadata); + return metadata; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage())); log.error("[{}] Full sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "全量同步失败,syncId=" + syncId); } finally { @@ -195,7 +206,8 @@ public class GraphSyncService { public SyncResult syncDatasets(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { List datasets = fetchDatasetsWithRetry(syncId); @@ -206,10 +218,14 @@ public class GraphSyncService { .collect(Collectors.toSet()); int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId); result.setPurged(purged); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, List.of(result))); return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage())); log.error("[{}] Dataset sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "数据集同步失败,syncId=" + syncId); } finally { @@ -219,7 +235,8 @@ public class GraphSyncService { public SyncResult syncFields(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { List datasets = fetchDatasetsWithRetry(syncId); @@ -237,10 +254,14 @@ public class GraphSyncService { } } result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId)); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, List.of(result))); return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage())); log.error("[{}] Field sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "字段同步失败,syncId=" + syncId); } finally { @@ -250,7 +271,8 @@ public class GraphSyncService { public SyncResult syncUsers(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { List datasets = fetchDatasetsWithRetry(syncId); @@ -266,10 +288,14 @@ public class GraphSyncService { SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId); Set activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet()); result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId)); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, List.of(result))); return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage())); log.error("[{}] User sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "用户同步失败,syncId=" + syncId); } finally { @@ -279,13 +305,19 @@ public class GraphSyncService { public SyncResult syncOrgs(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { - return stepService.upsertOrgEntities(graphId, syncId); + SyncResult result = stepService.upsertOrgEntities(graphId, syncId); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, List.of(result))); + return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage())); log.error("[{}] Org sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "组织同步失败,syncId=" + syncId); } finally { @@ -295,7 +327,7 @@ public class GraphSyncService { public SyncResult buildHasFieldRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeHasFieldRelations(graphId, syncId); @@ -312,7 +344,7 @@ public class GraphSyncService { public SyncResult buildDerivedFromRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeDerivedFromRelations(graphId, syncId); @@ -329,7 +361,7 @@ public class GraphSyncService { public SyncResult buildBelongsToRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeBelongsToRelations(graphId, syncId); @@ -350,7 +382,8 @@ public class GraphSyncService { public SyncResult syncWorkflows(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { List workflows = fetchWithRetry(syncId, "workflows", @@ -361,10 +394,14 @@ public class GraphSyncService { .filter(Objects::nonNull).filter(id -> !id.isBlank()) .collect(Collectors.toSet()); result.setPurged(stepService.purgeStaleEntities(graphId, "Workflow", activeIds, syncId)); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, List.of(result))); return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage())); log.error("[{}] Workflow sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "工作流同步失败,syncId=" + syncId); } finally { @@ -374,7 +411,8 @@ public class GraphSyncService { public SyncResult syncJobs(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { List jobs = fetchWithRetry(syncId, "jobs", @@ -385,10 +423,14 @@ public class GraphSyncService { .filter(Objects::nonNull).filter(id -> !id.isBlank()) .collect(Collectors.toSet()); result.setPurged(stepService.purgeStaleEntities(graphId, "Job", activeIds, syncId)); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, List.of(result))); return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage())); log.error("[{}] Job sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "作业同步失败,syncId=" + syncId); } finally { @@ -398,7 +440,8 @@ public class GraphSyncService { public SyncResult syncLabelTasks(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { List tasks = fetchWithRetry(syncId, "label-tasks", @@ -409,10 +452,14 @@ public class GraphSyncService { .filter(Objects::nonNull).filter(id -> !id.isBlank()) .collect(Collectors.toSet()); result.setPurged(stepService.purgeStaleEntities(graphId, "LabelTask", activeIds, syncId)); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, List.of(result))); return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage())); log.error("[{}] LabelTask sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "标注任务同步失败,syncId=" + syncId); } finally { @@ -422,7 +469,8 @@ public class GraphSyncService { public SyncResult syncKnowledgeSets(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); + LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { List knowledgeSets = fetchWithRetry(syncId, "knowledge-sets", @@ -433,10 +481,14 @@ public class GraphSyncService { .filter(Objects::nonNull).filter(id -> !id.isBlank()) .collect(Collectors.toSet()); result.setPurged(stepService.purgeStaleEntities(graphId, "KnowledgeSet", activeIds, syncId)); + saveSyncHistory(SyncMetadata.fromResults( + syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, List.of(result))); return result; } catch (BusinessException e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage())); throw e; } catch (Exception e) { + saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage())); log.error("[{}] KnowledgeSet sync failed for graphId={}", syncId, graphId, e); throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "知识集同步失败,syncId=" + syncId); } finally { @@ -450,7 +502,7 @@ public class GraphSyncService { public SyncResult buildUsesDatasetRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeUsesDatasetRelations(graphId, syncId); @@ -467,7 +519,7 @@ public class GraphSyncService { public SyncResult buildProducesRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeProducesRelations(graphId, syncId); @@ -484,7 +536,7 @@ public class GraphSyncService { public SyncResult buildAssignedToRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeAssignedToRelations(graphId, syncId); @@ -501,7 +553,7 @@ public class GraphSyncService { public SyncResult buildTriggersRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeTriggersRelations(graphId, syncId); @@ -518,7 +570,7 @@ public class GraphSyncService { public SyncResult buildDependsOnRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeDependsOnRelations(graphId, syncId); @@ -535,7 +587,7 @@ public class GraphSyncService { public SyncResult buildImpactsRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeImpactsRelations(graphId, syncId); @@ -552,7 +604,7 @@ public class GraphSyncService { public SyncResult buildSourcedFromRelations(String graphId) { validateGraphId(graphId); - String syncId = UUID.randomUUID().toString().substring(0, 8); + String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { return stepService.mergeSourcedFromRelations(graphId, syncId); @@ -567,6 +619,43 @@ public class GraphSyncService { } } + // ----------------------------------------------------------------------- + // 同步历史查询 + // ----------------------------------------------------------------------- + + /** + * 查询同步历史记录。 + */ + public List getSyncHistory(String graphId, String status, int limit) { + validateGraphId(graphId); + if (status != null && !status.isBlank()) { + return syncHistoryRepository.findByGraphIdAndStatus(graphId, status, limit); + } + return syncHistoryRepository.findByGraphId(graphId, limit); + } + + /** + * 按时间范围查询同步历史(分页)。 + */ + public List getSyncHistoryByTimeRange(String graphId, + LocalDateTime from, LocalDateTime to, + int page, int size) { + validateGraphId(graphId); + long skip = (long) page * size; + if (skip > 2_000_000L) { + throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "分页偏移量超出允许范围"); + } + return syncHistoryRepository.findByGraphIdAndTimeRange(graphId, from, to, skip, size); + } + + /** + * 根据 syncId 查询单条同步记录。 + */ + public Optional getSyncRecord(String graphId, String syncId) { + validateGraphId(graphId); + return syncHistoryRepository.findByGraphIdAndSyncId(graphId, syncId); + } + // ----------------------------------------------------------------------- // 内部方法 // ----------------------------------------------------------------------- @@ -671,6 +760,17 @@ public class GraphSyncService { } } + /** + * 持久化同步元数据,失败时仅记录日志,不影响主流程。 + */ + private void saveSyncHistory(SyncMetadata metadata) { + try { + syncHistoryRepository.save(metadata); + } catch (Exception e) { + log.warn("[{}] Failed to save sync history: {}", metadata.getSyncId(), e.getMessage()); + } + } + private void validateGraphId(String graphId) { if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) { throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效"); diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncMetadata.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncMetadata.java new file mode 100644 index 0000000..69ce1c9 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncMetadata.java @@ -0,0 +1,193 @@ +package com.datamate.knowledgegraph.domain.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.annotation.Transient; +import org.springframework.data.neo4j.core.schema.GeneratedValue; +import org.springframework.data.neo4j.core.schema.Id; +import org.springframework.data.neo4j.core.schema.Node; +import org.springframework.data.neo4j.core.schema.Property; +import org.springframework.data.neo4j.core.support.UUIDStringGenerator; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +/** + * 同步操作元数据,用于记录每次同步的整体状态和统计信息。 + *

+ * 同时作为 Neo4j 节点持久化到图数据库,支持历史查询和问题排查。 + */ +@Node("SyncHistory") +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SyncMetadata { + + public static final String STATUS_SUCCESS = "SUCCESS"; + public static final String STATUS_FAILED = "FAILED"; + public static final String STATUS_PARTIAL = "PARTIAL"; + + public static final String TYPE_FULL = "FULL"; + public static final String TYPE_DATASETS = "DATASETS"; + public static final String TYPE_FIELDS = "FIELDS"; + public static final String TYPE_USERS = "USERS"; + public static final String TYPE_ORGS = "ORGS"; + public static final String TYPE_WORKFLOWS = "WORKFLOWS"; + public static final String TYPE_JOBS = "JOBS"; + public static final String TYPE_LABEL_TASKS = "LABEL_TASKS"; + public static final String TYPE_KNOWLEDGE_SETS = "KNOWLEDGE_SETS"; + + @Id + @GeneratedValue(UUIDStringGenerator.class) + private String id; + + @Property("sync_id") + private String syncId; + + @Property("graph_id") + private String graphId; + + /** 同步类型:FULL / DATASETS / WORKFLOWS 等 */ + @Property("sync_type") + private String syncType; + + /** 同步状态:SUCCESS / FAILED / PARTIAL */ + @Property("status") + private String status; + + @Property("started_at") + private LocalDateTime startedAt; + + @Property("completed_at") + private LocalDateTime completedAt; + + @Property("duration_millis") + private long durationMillis; + + @Property("total_created") + @Builder.Default + private int totalCreated = 0; + + @Property("total_updated") + @Builder.Default + private int totalUpdated = 0; + + @Property("total_skipped") + @Builder.Default + private int totalSkipped = 0; + + @Property("total_failed") + @Builder.Default + private int totalFailed = 0; + + @Property("total_purged") + @Builder.Default + private int totalPurged = 0; + + /** 增量同步的时间窗口起始 */ + @Property("updated_from") + private LocalDateTime updatedFrom; + + /** 增量同步的时间窗口结束 */ + @Property("updated_to") + private LocalDateTime updatedTo; + + /** 同步失败时的错误信息 */ + @Property("error_message") + private String errorMessage; + + /** 各步骤的摘要,如 "Dataset(+5/~2/-0/purged:1)" */ + @Property("step_summaries") + @Builder.Default + private List stepSummaries = new ArrayList<>(); + + /** 详细的各步骤结果(不持久化到 Neo4j,仅在返回时携带) */ + @Transient + private List results; + + public int totalEntities() { + return totalCreated + totalUpdated + totalSkipped + totalFailed; + } + + /** + * 从 SyncResult 列表构建元数据。 + */ + public static SyncMetadata fromResults(String syncId, String graphId, String syncType, + LocalDateTime startedAt, List results) { + LocalDateTime completedAt = LocalDateTime.now(); + long duration = Duration.between(startedAt, completedAt).toMillis(); + + int created = 0, updated = 0, skipped = 0, failed = 0, purged = 0; + List summaries = new ArrayList<>(); + boolean hasFailures = false; + + for (SyncResult r : results) { + created += r.getCreated(); + updated += r.getUpdated(); + skipped += r.getSkipped(); + failed += r.getFailed(); + purged += r.getPurged(); + if (r.getFailed() > 0) { + hasFailures = true; + } + summaries.add(formatStepSummary(r)); + } + + String status = hasFailures ? STATUS_PARTIAL : STATUS_SUCCESS; + + return SyncMetadata.builder() + .syncId(syncId) + .graphId(graphId) + .syncType(syncType) + .status(status) + .startedAt(startedAt) + .completedAt(completedAt) + .durationMillis(duration) + .totalCreated(created) + .totalUpdated(updated) + .totalSkipped(skipped) + .totalFailed(failed) + .totalPurged(purged) + .stepSummaries(summaries) + .results(results) + .build(); + } + + /** + * 构建失败的元数据。 + */ + public static SyncMetadata failed(String syncId, String graphId, String syncType, + LocalDateTime startedAt, String errorMessage) { + LocalDateTime completedAt = LocalDateTime.now(); + long duration = Duration.between(startedAt, completedAt).toMillis(); + + return SyncMetadata.builder() + .syncId(syncId) + .graphId(graphId) + .syncType(syncType) + .status(STATUS_FAILED) + .startedAt(startedAt) + .completedAt(completedAt) + .durationMillis(duration) + .errorMessage(errorMessage) + .build(); + } + + private static String formatStepSummary(SyncResult r) { + StringBuilder sb = new StringBuilder(); + sb.append(r.getSyncType()) + .append("(+").append(r.getCreated()) + .append("/~").append(r.getUpdated()) + .append("/-").append(r.getFailed()); + if (r.getPurged() > 0) { + sb.append("/purged:").append(r.getPurged()); + } + sb.append(")"); + return sb.toString(); + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/SyncHistoryRepository.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/SyncHistoryRepository.java new file mode 100644 index 0000000..8e8667b --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/SyncHistoryRepository.java @@ -0,0 +1,43 @@ +package com.datamate.knowledgegraph.domain.repository; + +import com.datamate.knowledgegraph.domain.model.SyncMetadata; +import org.springframework.data.neo4j.repository.Neo4jRepository; +import org.springframework.data.neo4j.repository.query.Query; +import org.springframework.data.repository.query.Param; +import org.springframework.stereotype.Repository; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Optional; + +@Repository +public interface SyncHistoryRepository extends Neo4jRepository { + + @Query("MATCH (h:SyncHistory {graph_id: $graphId}) " + + "RETURN h ORDER BY h.started_at DESC LIMIT $limit") + List findByGraphId( + @Param("graphId") String graphId, + @Param("limit") int limit); + + @Query("MATCH (h:SyncHistory {graph_id: $graphId, status: $status}) " + + "RETURN h ORDER BY h.started_at DESC LIMIT $limit") + List findByGraphIdAndStatus( + @Param("graphId") String graphId, + @Param("status") String status, + @Param("limit") int limit); + + @Query("MATCH (h:SyncHistory {graph_id: $graphId, sync_id: $syncId}) RETURN h") + Optional findByGraphIdAndSyncId( + @Param("graphId") String graphId, + @Param("syncId") String syncId); + + @Query("MATCH (h:SyncHistory {graph_id: $graphId}) " + + "WHERE h.started_at >= $from AND h.started_at <= $to " + + "RETURN h ORDER BY h.started_at DESC SKIP $skip LIMIT $limit") + List findByGraphIdAndTimeRange( + @Param("graphId") String graphId, + @Param("from") LocalDateTime from, + @Param("to") LocalDateTime to, + @Param("skip") long skip, + @Param("limit") int limit); +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java index ea81ad2..d938859 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java @@ -72,7 +72,20 @@ public class GraphInitializer implements ApplicationRunner { "CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)", // 全文索引 - "CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]" + "CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]", + + // ── SyncHistory 约束和索引 ── + + // P1: syncId 唯一约束,防止 ID 碰撞 + "CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " + + "FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE", + + // P2-3: 查询优化索引 + "CREATE INDEX sync_history_graph_started IF NOT EXISTS " + + "FOR (h:SyncHistory) ON (h.graph_id, h.started_at)", + + "CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " + + "FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)" ); @Override diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncMetadataVO.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncMetadataVO.java new file mode 100644 index 0000000..3589447 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncMetadataVO.java @@ -0,0 +1,75 @@ +package com.datamate.knowledgegraph.interfaces.dto; + +import com.datamate.knowledgegraph.domain.model.SyncMetadata; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * 同步元数据视图对象。 + *

+ * 包含本次同步的整体统计信息和各步骤的详细结果。 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SyncMetadataVO { + + private String syncId; + private String graphId; + private String syncType; + private String status; + private LocalDateTime startedAt; + private LocalDateTime completedAt; + private long durationMillis; + private int totalCreated; + private int totalUpdated; + private int totalSkipped; + private int totalFailed; + private int totalPurged; + private int totalEntities; + private LocalDateTime updatedFrom; + private LocalDateTime updatedTo; + private String errorMessage; + private List stepSummaries; + /** 各步骤的详细结果(仅当前同步返回时携带,历史查询时为 null) */ + private List results; + + /** + * 从 SyncMetadata 转换(包含详细步骤结果)。 + */ + public static SyncMetadataVO from(SyncMetadata metadata) { + List resultVOs = null; + if (metadata.getResults() != null) { + resultVOs = metadata.getResults().stream() + .map(SyncResultVO::from) + .toList(); + } + + return SyncMetadataVO.builder() + .syncId(metadata.getSyncId()) + .graphId(metadata.getGraphId()) + .syncType(metadata.getSyncType()) + .status(metadata.getStatus()) + .startedAt(metadata.getStartedAt()) + .completedAt(metadata.getCompletedAt()) + .durationMillis(metadata.getDurationMillis()) + .totalCreated(metadata.getTotalCreated()) + .totalUpdated(metadata.getTotalUpdated()) + .totalSkipped(metadata.getTotalSkipped()) + .totalFailed(metadata.getTotalFailed()) + .totalPurged(metadata.getTotalPurged()) + .totalEntities(metadata.totalEntities()) + .updatedFrom(metadata.getUpdatedFrom()) + .updatedTo(metadata.getUpdatedTo()) + .errorMessage(metadata.getErrorMessage()) + .stepSummaries(metadata.getStepSummaries()) + .results(resultVOs) + .build(); + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java index ff4dd31..62516ab 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java @@ -1,13 +1,20 @@ package com.datamate.knowledgegraph.interfaces.rest; import com.datamate.knowledgegraph.application.GraphSyncService; +import com.datamate.knowledgegraph.domain.model.SyncMetadata; import com.datamate.knowledgegraph.domain.model.SyncResult; +import com.datamate.knowledgegraph.interfaces.dto.SyncMetadataVO; import com.datamate.knowledgegraph.interfaces.dto.SyncResultVO; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.Pattern; import lombok.RequiredArgsConstructor; +import org.springframework.format.annotation.DateTimeFormat; +import org.springframework.http.ResponseEntity; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; +import java.time.LocalDateTime; import java.util.List; /** @@ -36,10 +43,10 @@ public class GraphSyncController { * 全量同步:拉取所有实体并构建关系。 */ @PostMapping("/full") - public List syncAll( + public SyncMetadataVO syncAll( @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { - List results = syncService.syncAll(graphId); - return results.stream().map(SyncResultVO::from).toList(); + SyncMetadata metadata = syncService.syncAll(graphId); + return SyncMetadataVO.from(metadata); } /** @@ -211,4 +218,50 @@ public class GraphSyncController { @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { return SyncResultVO.from(syncService.buildSourcedFromRelations(graphId)); } + + // ----------------------------------------------------------------------- + // 同步历史查询端点 + // ----------------------------------------------------------------------- + + /** + * 查询同步历史记录。 + * + * @param status 可选,按状态过滤(SUCCESS / FAILED / PARTIAL) + * @param limit 返回条数上限,默认 20 + */ + @GetMapping("/history") + public List getSyncHistory( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId, + @RequestParam(required = false) String status, + @RequestParam(defaultValue = "20") @Min(1) @Max(200) int limit) { + List history = syncService.getSyncHistory(graphId, status, limit); + return history.stream().map(SyncMetadataVO::from).toList(); + } + + /** + * 按时间范围查询同步历史。 + */ + @GetMapping("/history/range") + public List getSyncHistoryByTimeRange( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId, + @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime from, + @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime to, + @RequestParam(defaultValue = "0") @Min(0) @Max(10000) int page, + @RequestParam(defaultValue = "20") @Min(1) @Max(200) int size) { + List history = syncService.getSyncHistoryByTimeRange(graphId, from, to, page, size); + return history.stream().map(SyncMetadataVO::from).toList(); + } + + /** + * 根据 syncId 查询单条同步记录。 + */ + @GetMapping("/history/{syncId}") + public ResponseEntity getSyncRecord( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId, + @PathVariable String syncId) { + return syncService.getSyncRecord(graphId, syncId) + .map(SyncMetadataVO::from) + .map(ResponseEntity::ok) + .orElse(ResponseEntity.notFound().build()); + } } diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java index e3891e9..7b6c490 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java @@ -1,7 +1,9 @@ package com.datamate.knowledgegraph.application; import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.knowledgegraph.domain.model.SyncMetadata; import com.datamate.knowledgegraph.domain.model.SyncResult; +import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO; @@ -13,12 +15,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -42,6 +47,9 @@ class GraphSyncServiceTest { @Mock private KnowledgeGraphProperties properties; + @Mock + private SyncHistoryRepository syncHistoryRepository; + @InjectMocks private GraphSyncService syncService; @@ -161,7 +169,7 @@ class GraphSyncServiceTest { when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); - List results = syncService.syncAll(GRAPH_ID); + List results = syncService.syncAll(GRAPH_ID).getResults(); // 8 entities + 10 relations = 18 assertThat(results).hasSize(18); @@ -335,4 +343,324 @@ class GraphSyncServiceTest { .isInstanceOf(BusinessException.class); } } + + // ----------------------------------------------------------------------- + // 同步元数据记录 + // ----------------------------------------------------------------------- + + @Nested + class SyncMetadataRecordingTest { + + @Test + void syncAll_success_recordsMetadataWithCorrectFields() { + when(properties.getSync()).thenReturn(syncConfig); + + DatasetDTO dto = new DatasetDTO(); + dto.setId("ds-001"); + dto.setName("Test"); + dto.setCreatedBy("admin"); + when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto)); + when(dataManagementClient.listAllWorkflows()).thenReturn(List.of()); + when(dataManagementClient.listAllJobs()).thenReturn(List.of()); + when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of()); + when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of()); + + when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Dataset").created(3).updated(1).build()); + when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Field").build()); + when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) + .thenReturn(SyncResult.builder().syncType("User").build()); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("Org").build()); + when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Workflow").build()); + when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Job").build()); + when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("LabelTask").build()); + when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("KnowledgeSet").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString())) + .thenReturn(0); + when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); + when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); + when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); + when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); + when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("PRODUCES").build()); + when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build()); + when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("TRIGGERS").build()); + when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build()); + when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("IMPACTS").build()); + when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); + + SyncMetadata metadata = syncService.syncAll(GRAPH_ID); + + assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS); + assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL); + assertThat(metadata.getGraphId()).isEqualTo(GRAPH_ID); + assertThat(metadata.getSyncId()).isNotNull(); + assertThat(metadata.getStartedAt()).isNotNull(); + assertThat(metadata.getCompletedAt()).isNotNull(); + assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0); + assertThat(metadata.getTotalCreated()).isEqualTo(3); + assertThat(metadata.getTotalUpdated()).isEqualTo(1); + assertThat(metadata.getResults()).hasSize(18); + assertThat(metadata.getStepSummaries()).hasSize(18); + assertThat(metadata.getErrorMessage()).isNull(); + + // 验证持久化被调用 + ArgumentCaptor captor = ArgumentCaptor.forClass(SyncMetadata.class); + verify(syncHistoryRepository).save(captor.capture()); + SyncMetadata saved = captor.getValue(); + assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS); + assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID); + } + + @Test + void syncAll_withFailedSteps_recordsPartialStatus() { + when(properties.getSync()).thenReturn(syncConfig); + + DatasetDTO dto = new DatasetDTO(); + dto.setId("ds-001"); + dto.setName("Test"); + dto.setCreatedBy("admin"); + when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto)); + when(dataManagementClient.listAllWorkflows()).thenReturn(List.of()); + when(dataManagementClient.listAllJobs()).thenReturn(List.of()); + when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of()); + when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of()); + + // Dataset step has failures + SyncResult datasetResult = SyncResult.builder().syncType("Dataset").created(2).failed(1).build(); + datasetResult.setErrors(new java.util.ArrayList<>(List.of("some error"))); + when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(datasetResult); + when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Field").build()); + when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) + .thenReturn(SyncResult.builder().syncType("User").build()); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("Org").build()); + when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Workflow").build()); + when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Job").build()); + when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("LabelTask").build()); + when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("KnowledgeSet").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString())) + .thenReturn(0); + when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); + when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); + when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); + when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); + when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("PRODUCES").build()); + when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build()); + when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("TRIGGERS").build()); + when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build()); + when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("IMPACTS").build()); + when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); + + SyncMetadata metadata = syncService.syncAll(GRAPH_ID); + + assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL); + assertThat(metadata.getTotalFailed()).isEqualTo(1); + assertThat(metadata.getTotalCreated()).isEqualTo(2); + } + + @Test + void syncAll_exceptionThrown_recordsFailedMetadata() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("connection refused")); + + assertThatThrownBy(() -> syncService.syncAll(GRAPH_ID)) + .isInstanceOf(BusinessException.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass(SyncMetadata.class); + verify(syncHistoryRepository).save(captor.capture()); + SyncMetadata saved = captor.getValue(); + assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED); + assertThat(saved.getErrorMessage()).isNotNull(); + assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID); + assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL); + } + + @Test + void syncDatasets_success_recordsMetadata() { + when(properties.getSync()).thenReturn(syncConfig); + + DatasetDTO dto = new DatasetDTO(); + dto.setId("ds-001"); + dto.setName("Test"); + when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto)); + when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Dataset").created(1).build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString())) + .thenReturn(0); + + syncService.syncDatasets(GRAPH_ID); + + ArgumentCaptor captor = ArgumentCaptor.forClass(SyncMetadata.class); + verify(syncHistoryRepository).save(captor.capture()); + SyncMetadata saved = captor.getValue(); + assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS); + assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS); + assertThat(saved.getTotalCreated()).isEqualTo(1); + } + + @Test + void syncDatasets_failed_recordsFailedMetadata() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("timeout")); + + assertThatThrownBy(() -> syncService.syncDatasets(GRAPH_ID)) + .isInstanceOf(BusinessException.class); + + ArgumentCaptor captor = ArgumentCaptor.forClass(SyncMetadata.class); + verify(syncHistoryRepository).save(captor.capture()); + SyncMetadata saved = captor.getValue(); + assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED); + assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS); + } + + @Test + void saveSyncHistory_exceptionInSave_doesNotAffectMainFlow() { + when(properties.getSync()).thenReturn(syncConfig); + + DatasetDTO dto = new DatasetDTO(); + dto.setId("ds-001"); + dto.setName("Test"); + when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto)); + when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Dataset").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString())) + .thenReturn(0); + // saveSyncHistory 内部异常不应影响主流程 + when(syncHistoryRepository.save(any())).thenThrow(new RuntimeException("Neo4j down")); + + SyncResult result = syncService.syncDatasets(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("Dataset"); + } + } + + // ----------------------------------------------------------------------- + // 同步历史查询 + // ----------------------------------------------------------------------- + + @Nested + class SyncHistoryQueryTest { + + @Test + void getSyncHistory_invalidGraphId_throwsBusinessException() { + assertThatThrownBy(() -> syncService.getSyncHistory(INVALID_GRAPH_ID, null, 20)) + .isInstanceOf(BusinessException.class); + } + + @Test + void getSyncHistory_noStatusFilter_callsFindByGraphId() { + when(syncHistoryRepository.findByGraphId(GRAPH_ID, 20)).thenReturn(List.of()); + + List result = syncService.getSyncHistory(GRAPH_ID, null, 20); + + assertThat(result).isEmpty(); + verify(syncHistoryRepository).findByGraphId(GRAPH_ID, 20); + verify(syncHistoryRepository, never()).findByGraphIdAndStatus(anyString(), anyString(), anyInt()); + } + + @Test + void getSyncHistory_withStatusFilter_callsFindByGraphIdAndStatus() { + when(syncHistoryRepository.findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10)) + .thenReturn(List.of()); + + List result = syncService.getSyncHistory(GRAPH_ID, "SUCCESS", 10); + + assertThat(result).isEmpty(); + verify(syncHistoryRepository).findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10); + } + + @Test + void getSyncRecord_found_returnsRecord() { + SyncMetadata expected = SyncMetadata.builder() + .syncId("abc12345").graphId(GRAPH_ID).build(); + when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "abc12345")) + .thenReturn(Optional.of(expected)); + + Optional result = syncService.getSyncRecord(GRAPH_ID, "abc12345"); + + assertThat(result).isPresent(); + assertThat(result.get().getSyncId()).isEqualTo("abc12345"); + } + + @Test + void getSyncRecord_notFound_returnsEmpty() { + when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "notexist")) + .thenReturn(Optional.empty()); + + Optional result = syncService.getSyncRecord(GRAPH_ID, "notexist"); + + assertThat(result).isEmpty(); + } + + @Test + void getSyncHistoryByTimeRange_delegatesToRepository() { + LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0); + LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59); + when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20)) + .thenReturn(List.of()); + + List result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 0, 20); + + assertThat(result).isEmpty(); + verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20); + } + + @Test + void getSyncHistoryByTimeRange_pagination_computesSkipCorrectly() { + LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0); + LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59); + when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20)) + .thenReturn(List.of()); + + List result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 2, 20); + + assertThat(result).isEmpty(); + // page=2, size=20 → skip=40 + verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20); + } + + @Test + void getSyncHistoryByTimeRange_skipOverflow_throwsBusinessException() { + // 模拟绕过 Controller 校验直接调用 Service 的场景 + assertThatThrownBy(() -> syncService.getSyncHistoryByTimeRange( + GRAPH_ID, + LocalDateTime.of(2025, 1, 1, 0, 0), + LocalDateTime.of(2025, 12, 31, 23, 59), + 20000, 200)) + .isInstanceOf(BusinessException.class) + .hasMessageContaining("分页偏移量"); + } + } } diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/domain/model/SyncMetadataTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/domain/model/SyncMetadataTest.java new file mode 100644 index 0000000..d905c7c --- /dev/null +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/domain/model/SyncMetadataTest.java @@ -0,0 +1,96 @@ +package com.datamate.knowledgegraph.domain.model; + +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class SyncMetadataTest { + + @Test + void fromResults_aggregatesCountsCorrectly() { + LocalDateTime startedAt = LocalDateTime.of(2025, 6, 1, 10, 0, 0); + + SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(1).purged(3).build(); + SyncResult r2 = SyncResult.builder().syncType("Field").created(10).updated(0).skipped(2).build(); + SyncResult r3 = SyncResult.builder().syncType("HAS_FIELD").created(8).build(); + + SyncMetadata metadata = SyncMetadata.fromResults("abc123", "graph-id", "FULL", startedAt, List.of(r1, r2, r3)); + + assertThat(metadata.getSyncId()).isEqualTo("abc123"); + assertThat(metadata.getGraphId()).isEqualTo("graph-id"); + assertThat(metadata.getSyncType()).isEqualTo("FULL"); + assertThat(metadata.getTotalCreated()).isEqualTo(23); // 5 + 10 + 8 + assertThat(metadata.getTotalUpdated()).isEqualTo(2); // 2 + 0 + 0 + assertThat(metadata.getTotalSkipped()).isEqualTo(2); // 0 + 2 + 0 + assertThat(metadata.getTotalFailed()).isEqualTo(1); // 1 + 0 + 0 + assertThat(metadata.getTotalPurged()).isEqualTo(3); // 3 + 0 + 0 + assertThat(metadata.getStartedAt()).isEqualTo(startedAt); + assertThat(metadata.getCompletedAt()).isNotNull(); + assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0); + assertThat(metadata.getResults()).hasSize(3); + assertThat(metadata.getStepSummaries()).hasSize(3); + } + + @Test + void fromResults_noFailures_statusIsSuccess() { + LocalDateTime startedAt = LocalDateTime.now(); + SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).build(); + + SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1)); + + assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS); + } + + @Test + void fromResults_withFailures_statusIsPartial() { + LocalDateTime startedAt = LocalDateTime.now(); + SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).failed(2).build(); + + SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1)); + + assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL); + assertThat(metadata.getTotalFailed()).isEqualTo(2); + } + + @Test + void failed_createsFailedMetadata() { + LocalDateTime startedAt = LocalDateTime.of(2025, 1, 1, 0, 0, 0); + + SyncMetadata metadata = SyncMetadata.failed("abc", "g1", "FULL", startedAt, "connection refused"); + + assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED); + assertThat(metadata.getErrorMessage()).isEqualTo("connection refused"); + assertThat(metadata.getSyncId()).isEqualTo("abc"); + assertThat(metadata.getGraphId()).isEqualTo("g1"); + assertThat(metadata.getSyncType()).isEqualTo("FULL"); + assertThat(metadata.getStartedAt()).isEqualTo(startedAt); + assertThat(metadata.getCompletedAt()).isNotNull(); + assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0); + assertThat(metadata.getTotalCreated()).isEqualTo(0); + assertThat(metadata.getTotalUpdated()).isEqualTo(0); + } + + @Test + void totalEntities_returnsSum() { + SyncMetadata metadata = SyncMetadata.builder() + .totalCreated(10).totalUpdated(5).totalSkipped(3).totalFailed(2) + .build(); + + assertThat(metadata.totalEntities()).isEqualTo(20); + } + + @Test + void stepSummaries_formatIncludesPurgedWhenNonZero() { + LocalDateTime startedAt = LocalDateTime.now(); + SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(0).purged(3).build(); + SyncResult r2 = SyncResult.builder().syncType("Field").created(1).updated(0).failed(0).purged(0).build(); + + SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1, r2)); + + assertThat(metadata.getStepSummaries().get(0)).isEqualTo("Dataset(+5/~2/-0/purged:3)"); + assertThat(metadata.getStepSummaries().get(1)).isEqualTo("Field(+1/~0/-0)"); + } +}