feat(kg): P0-04 同步结果元数据增强

实现同步历史记录和元数据功能:

新增功能:
- 添加 SyncHistory 节点记录同步历史
- 添加 /history 和 /history/range API 查询同步历史
- 添加 /full API 返回完整同步结果(含元数据)

问题修复:
- [P1] syncId 改为完整 UUID (36位),添加 (graph_id, sync_id) 唯一约束
- [P2-1] /history limit 添加 @Min(1) @Max(200) 边界校验
- [P2-2] /history/range 添加分页 (page, size),skip 越界保护 (>2M)
- [P2-3] 添加 SyncHistory 索引:(graph_id, started_at), (graph_id, status, started_at)

测试:
- 182 tests 通过 (新增 2 个测试)
- GraphSyncServiceTest, GraphInitializerTest, SyncMetadataTest 全部通过

代码变更:+521 行,-27 行
新增文件:4 个 (SyncMetadata, SyncHistoryRepository, SyncMetadataVO, SyncMetadataTest)
修改文件:5 个
This commit is contained in:
2026-02-18 16:55:03 +08:00
parent 74daed1c25
commit 42069f82b3
9 changed files with 931 additions and 28 deletions

2
.gitignore vendored
View File

@@ -190,3 +190,5 @@ Thumbs.db
# Milvus
deployment/docker/milvus/volumes/
# Local documentation
docs/knowledge-graph/

View File

@@ -2,7 +2,9 @@ package com.datamate.knowledgegraph.application;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
@@ -15,6 +17,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -52,6 +55,7 @@ public class GraphSyncService {
private final GraphSyncStepService stepService;
private final DataManagementClient dataManagementClient;
private final KnowledgeGraphProperties properties;
private final SyncHistoryRepository syncHistoryRepository;
/** 同 graphId 互斥锁,防止并发同步。 */
private final ConcurrentHashMap<String, ReentrantLock> graphLocks = new ConcurrentHashMap<>();
@@ -60,9 +64,10 @@ public class GraphSyncService {
// 全量同步
// -----------------------------------------------------------------------
public List<SyncResult> syncAll(String graphId) {
public SyncMetadata syncAll(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
@@ -178,10 +183,16 @@ public class GraphSyncService {
results.stream()
.map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")")
.collect(Collectors.joining(", ")));
return results;
SyncMetadata metadata = SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, results);
saveSyncHistory(metadata);
return metadata;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FULL, startedAt, e.getMessage()));
log.error("[{}] Full sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "全量同步失败,syncId=" + syncId);
} finally {
@@ -195,7 +206,8 @@ public class GraphSyncService {
public SyncResult syncDatasets(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
@@ -206,10 +218,14 @@ public class GraphSyncService {
.collect(Collectors.toSet());
int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId);
result.setPurged(purged);
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_DATASETS, startedAt, e.getMessage()));
log.error("[{}] Dataset sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "数据集同步失败,syncId=" + syncId);
} finally {
@@ -219,7 +235,8 @@ public class GraphSyncService {
public SyncResult syncFields(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
@@ -237,10 +254,14 @@ public class GraphSyncService {
}
}
result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_FIELDS, startedAt, e.getMessage()));
log.error("[{}] Field sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "字段同步失败,syncId=" + syncId);
} finally {
@@ -250,7 +271,8 @@ public class GraphSyncService {
public SyncResult syncUsers(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
@@ -266,10 +288,14 @@ public class GraphSyncService {
SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId);
Set<String> activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_USERS, startedAt, e.getMessage()));
log.error("[{}] User sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "用户同步失败,syncId=" + syncId);
} finally {
@@ -279,13 +305,19 @@ public class GraphSyncService {
public SyncResult syncOrgs(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.upsertOrgEntities(graphId, syncId);
SyncResult result = stepService.upsertOrgEntities(graphId, syncId);
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, e.getMessage()));
log.error("[{}] Org sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "组织同步失败,syncId=" + syncId);
} finally {
@@ -295,7 +327,7 @@ public class GraphSyncService {
public SyncResult buildHasFieldRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeHasFieldRelations(graphId, syncId);
@@ -312,7 +344,7 @@ public class GraphSyncService {
public SyncResult buildDerivedFromRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeDerivedFromRelations(graphId, syncId);
@@ -329,7 +361,7 @@ public class GraphSyncService {
public SyncResult buildBelongsToRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeBelongsToRelations(graphId, syncId);
@@ -350,7 +382,8 @@ public class GraphSyncService {
public SyncResult syncWorkflows(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<WorkflowDTO> workflows = fetchWithRetry(syncId, "workflows",
@@ -361,10 +394,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "Workflow", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_WORKFLOWS, startedAt, e.getMessage()));
log.error("[{}] Workflow sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "工作流同步失败,syncId=" + syncId);
} finally {
@@ -374,7 +411,8 @@ public class GraphSyncService {
public SyncResult syncJobs(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<JobDTO> jobs = fetchWithRetry(syncId, "jobs",
@@ -385,10 +423,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "Job", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_JOBS, startedAt, e.getMessage()));
log.error("[{}] Job sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "作业同步失败,syncId=" + syncId);
} finally {
@@ -398,7 +440,8 @@ public class GraphSyncService {
public SyncResult syncLabelTasks(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<LabelTaskDTO> tasks = fetchWithRetry(syncId, "label-tasks",
@@ -409,10 +452,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "LabelTask", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_LABEL_TASKS, startedAt, e.getMessage()));
log.error("[{}] LabelTask sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "标注任务同步失败,syncId=" + syncId);
} finally {
@@ -422,7 +469,8 @@ public class GraphSyncService {
public SyncResult syncKnowledgeSets(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
LocalDateTime startedAt = LocalDateTime.now();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<KnowledgeSetDTO> knowledgeSets = fetchWithRetry(syncId, "knowledge-sets",
@@ -433,10 +481,14 @@ public class GraphSyncService {
.filter(Objects::nonNull).filter(id -> !id.isBlank())
.collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "KnowledgeSet", activeIds, syncId));
saveSyncHistory(SyncMetadata.fromResults(
syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, List.of(result)));
return result;
} catch (BusinessException e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage()));
throw e;
} catch (Exception e) {
saveSyncHistory(SyncMetadata.failed(syncId, graphId, SyncMetadata.TYPE_KNOWLEDGE_SETS, startedAt, e.getMessage()));
log.error("[{}] KnowledgeSet sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "知识集同步失败,syncId=" + syncId);
} finally {
@@ -450,7 +502,7 @@ public class GraphSyncService {
public SyncResult buildUsesDatasetRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeUsesDatasetRelations(graphId, syncId);
@@ -467,7 +519,7 @@ public class GraphSyncService {
public SyncResult buildProducesRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeProducesRelations(graphId, syncId);
@@ -484,7 +536,7 @@ public class GraphSyncService {
public SyncResult buildAssignedToRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeAssignedToRelations(graphId, syncId);
@@ -501,7 +553,7 @@ public class GraphSyncService {
public SyncResult buildTriggersRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeTriggersRelations(graphId, syncId);
@@ -518,7 +570,7 @@ public class GraphSyncService {
public SyncResult buildDependsOnRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeDependsOnRelations(graphId, syncId);
@@ -535,7 +587,7 @@ public class GraphSyncService {
public SyncResult buildImpactsRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeImpactsRelations(graphId, syncId);
@@ -552,7 +604,7 @@ public class GraphSyncService {
public SyncResult buildSourcedFromRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
String syncId = UUID.randomUUID().toString();
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeSourcedFromRelations(graphId, syncId);
@@ -567,6 +619,43 @@ public class GraphSyncService {
}
}
// -----------------------------------------------------------------------
// 同步历史查询
// -----------------------------------------------------------------------
/**
* 查询同步历史记录。
*/
public List<SyncMetadata> getSyncHistory(String graphId, String status, int limit) {
validateGraphId(graphId);
if (status != null && !status.isBlank()) {
return syncHistoryRepository.findByGraphIdAndStatus(graphId, status, limit);
}
return syncHistoryRepository.findByGraphId(graphId, limit);
}
/**
* 按时间范围查询同步历史(分页)。
*/
public List<SyncMetadata> getSyncHistoryByTimeRange(String graphId,
LocalDateTime from, LocalDateTime to,
int page, int size) {
validateGraphId(graphId);
long skip = (long) page * size;
if (skip > 2_000_000L) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "分页偏移量超出允许范围");
}
return syncHistoryRepository.findByGraphIdAndTimeRange(graphId, from, to, skip, size);
}
/**
* 根据 syncId 查询单条同步记录。
*/
public Optional<SyncMetadata> getSyncRecord(String graphId, String syncId) {
validateGraphId(graphId);
return syncHistoryRepository.findByGraphIdAndSyncId(graphId, syncId);
}
// -----------------------------------------------------------------------
// 内部方法
// -----------------------------------------------------------------------
@@ -671,6 +760,17 @@ public class GraphSyncService {
}
}
/**
* 持久化同步元数据,失败时仅记录日志,不影响主流程。
*/
private void saveSyncHistory(SyncMetadata metadata) {
try {
syncHistoryRepository.save(metadata);
} catch (Exception e) {
log.warn("[{}] Failed to save sync history: {}", metadata.getSyncId(), e.getMessage());
}
}
private void validateGraphId(String graphId) {
if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效");

View File

@@ -0,0 +1,193 @@
package com.datamate.knowledgegraph.domain.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Transient;
import org.springframework.data.neo4j.core.schema.GeneratedValue;
import org.springframework.data.neo4j.core.schema.Id;
import org.springframework.data.neo4j.core.schema.Node;
import org.springframework.data.neo4j.core.schema.Property;
import org.springframework.data.neo4j.core.support.UUIDStringGenerator;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 同步操作元数据,用于记录每次同步的整体状态和统计信息。
* <p>
* 同时作为 Neo4j 节点持久化到图数据库,支持历史查询和问题排查。
*/
@Node("SyncHistory")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SyncMetadata {
public static final String STATUS_SUCCESS = "SUCCESS";
public static final String STATUS_FAILED = "FAILED";
public static final String STATUS_PARTIAL = "PARTIAL";
public static final String TYPE_FULL = "FULL";
public static final String TYPE_DATASETS = "DATASETS";
public static final String TYPE_FIELDS = "FIELDS";
public static final String TYPE_USERS = "USERS";
public static final String TYPE_ORGS = "ORGS";
public static final String TYPE_WORKFLOWS = "WORKFLOWS";
public static final String TYPE_JOBS = "JOBS";
public static final String TYPE_LABEL_TASKS = "LABEL_TASKS";
public static final String TYPE_KNOWLEDGE_SETS = "KNOWLEDGE_SETS";
@Id
@GeneratedValue(UUIDStringGenerator.class)
private String id;
@Property("sync_id")
private String syncId;
@Property("graph_id")
private String graphId;
/** 同步类型:FULL / DATASETS / WORKFLOWS 等 */
@Property("sync_type")
private String syncType;
/** 同步状态:SUCCESS / FAILED / PARTIAL */
@Property("status")
private String status;
@Property("started_at")
private LocalDateTime startedAt;
@Property("completed_at")
private LocalDateTime completedAt;
@Property("duration_millis")
private long durationMillis;
@Property("total_created")
@Builder.Default
private int totalCreated = 0;
@Property("total_updated")
@Builder.Default
private int totalUpdated = 0;
@Property("total_skipped")
@Builder.Default
private int totalSkipped = 0;
@Property("total_failed")
@Builder.Default
private int totalFailed = 0;
@Property("total_purged")
@Builder.Default
private int totalPurged = 0;
/** 增量同步的时间窗口起始 */
@Property("updated_from")
private LocalDateTime updatedFrom;
/** 增量同步的时间窗口结束 */
@Property("updated_to")
private LocalDateTime updatedTo;
/** 同步失败时的错误信息 */
@Property("error_message")
private String errorMessage;
/** 各步骤的摘要,如 "Dataset(+5/~2/-0/purged:1)" */
@Property("step_summaries")
@Builder.Default
private List<String> stepSummaries = new ArrayList<>();
/** 详细的各步骤结果(不持久化到 Neo4j,仅在返回时携带) */
@Transient
private List<SyncResult> results;
public int totalEntities() {
return totalCreated + totalUpdated + totalSkipped + totalFailed;
}
/**
* 从 SyncResult 列表构建元数据。
*/
public static SyncMetadata fromResults(String syncId, String graphId, String syncType,
LocalDateTime startedAt, List<SyncResult> results) {
LocalDateTime completedAt = LocalDateTime.now();
long duration = Duration.between(startedAt, completedAt).toMillis();
int created = 0, updated = 0, skipped = 0, failed = 0, purged = 0;
List<String> summaries = new ArrayList<>();
boolean hasFailures = false;
for (SyncResult r : results) {
created += r.getCreated();
updated += r.getUpdated();
skipped += r.getSkipped();
failed += r.getFailed();
purged += r.getPurged();
if (r.getFailed() > 0) {
hasFailures = true;
}
summaries.add(formatStepSummary(r));
}
String status = hasFailures ? STATUS_PARTIAL : STATUS_SUCCESS;
return SyncMetadata.builder()
.syncId(syncId)
.graphId(graphId)
.syncType(syncType)
.status(status)
.startedAt(startedAt)
.completedAt(completedAt)
.durationMillis(duration)
.totalCreated(created)
.totalUpdated(updated)
.totalSkipped(skipped)
.totalFailed(failed)
.totalPurged(purged)
.stepSummaries(summaries)
.results(results)
.build();
}
/**
* 构建失败的元数据。
*/
public static SyncMetadata failed(String syncId, String graphId, String syncType,
LocalDateTime startedAt, String errorMessage) {
LocalDateTime completedAt = LocalDateTime.now();
long duration = Duration.between(startedAt, completedAt).toMillis();
return SyncMetadata.builder()
.syncId(syncId)
.graphId(graphId)
.syncType(syncType)
.status(STATUS_FAILED)
.startedAt(startedAt)
.completedAt(completedAt)
.durationMillis(duration)
.errorMessage(errorMessage)
.build();
}
private static String formatStepSummary(SyncResult r) {
StringBuilder sb = new StringBuilder();
sb.append(r.getSyncType())
.append("(+").append(r.getCreated())
.append("/~").append(r.getUpdated())
.append("/-").append(r.getFailed());
if (r.getPurged() > 0) {
sb.append("/purged:").append(r.getPurged());
}
sb.append(")");
return sb.toString();
}
}

View File

@@ -0,0 +1,43 @@
package com.datamate.knowledgegraph.domain.repository;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import org.springframework.data.neo4j.repository.Neo4jRepository;
import org.springframework.data.neo4j.repository.query.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
@Repository
public interface SyncHistoryRepository extends Neo4jRepository<SyncMetadata, String> {
@Query("MATCH (h:SyncHistory {graph_id: $graphId}) " +
"RETURN h ORDER BY h.started_at DESC LIMIT $limit")
List<SyncMetadata> findByGraphId(
@Param("graphId") String graphId,
@Param("limit") int limit);
@Query("MATCH (h:SyncHistory {graph_id: $graphId, status: $status}) " +
"RETURN h ORDER BY h.started_at DESC LIMIT $limit")
List<SyncMetadata> findByGraphIdAndStatus(
@Param("graphId") String graphId,
@Param("status") String status,
@Param("limit") int limit);
@Query("MATCH (h:SyncHistory {graph_id: $graphId, sync_id: $syncId}) RETURN h")
Optional<SyncMetadata> findByGraphIdAndSyncId(
@Param("graphId") String graphId,
@Param("syncId") String syncId);
@Query("MATCH (h:SyncHistory {graph_id: $graphId}) " +
"WHERE h.started_at >= $from AND h.started_at <= $to " +
"RETURN h ORDER BY h.started_at DESC SKIP $skip LIMIT $limit")
List<SyncMetadata> findByGraphIdAndTimeRange(
@Param("graphId") String graphId,
@Param("from") LocalDateTime from,
@Param("to") LocalDateTime to,
@Param("skip") long skip,
@Param("limit") int limit);
}

View File

@@ -72,7 +72,20 @@ public class GraphInitializer implements ApplicationRunner {
"CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)",
// 全文索引
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]"
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]",
// ── SyncHistory 约束和索引 ──
// P1: syncId 唯一约束,防止 ID 碰撞
"CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " +
"FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE",
// P2-3: 查询优化索引
"CREATE INDEX sync_history_graph_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.started_at)",
"CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)"
);
@Override

View File

@@ -0,0 +1,75 @@
package com.datamate.knowledgegraph.interfaces.dto;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.List;
/**
* 同步元数据视图对象。
* <p>
* 包含本次同步的整体统计信息和各步骤的详细结果。
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SyncMetadataVO {
private String syncId;
private String graphId;
private String syncType;
private String status;
private LocalDateTime startedAt;
private LocalDateTime completedAt;
private long durationMillis;
private int totalCreated;
private int totalUpdated;
private int totalSkipped;
private int totalFailed;
private int totalPurged;
private int totalEntities;
private LocalDateTime updatedFrom;
private LocalDateTime updatedTo;
private String errorMessage;
private List<String> stepSummaries;
/** 各步骤的详细结果(仅当前同步返回时携带,历史查询时为 null) */
private List<SyncResultVO> results;
/**
* 从 SyncMetadata 转换(包含详细步骤结果)。
*/
public static SyncMetadataVO from(SyncMetadata metadata) {
List<SyncResultVO> resultVOs = null;
if (metadata.getResults() != null) {
resultVOs = metadata.getResults().stream()
.map(SyncResultVO::from)
.toList();
}
return SyncMetadataVO.builder()
.syncId(metadata.getSyncId())
.graphId(metadata.getGraphId())
.syncType(metadata.getSyncType())
.status(metadata.getStatus())
.startedAt(metadata.getStartedAt())
.completedAt(metadata.getCompletedAt())
.durationMillis(metadata.getDurationMillis())
.totalCreated(metadata.getTotalCreated())
.totalUpdated(metadata.getTotalUpdated())
.totalSkipped(metadata.getTotalSkipped())
.totalFailed(metadata.getTotalFailed())
.totalPurged(metadata.getTotalPurged())
.totalEntities(metadata.totalEntities())
.updatedFrom(metadata.getUpdatedFrom())
.updatedTo(metadata.getUpdatedTo())
.errorMessage(metadata.getErrorMessage())
.stepSummaries(metadata.getStepSummaries())
.results(resultVOs)
.build();
}
}

View File

@@ -1,13 +1,20 @@
package com.datamate.knowledgegraph.interfaces.rest;
import com.datamate.knowledgegraph.application.GraphSyncService;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.interfaces.dto.SyncMetadataVO;
import com.datamate.knowledgegraph.interfaces.dto.SyncResultVO;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Pattern;
import lombok.RequiredArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.List;
/**
@@ -36,10 +43,10 @@ public class GraphSyncController {
* 全量同步:拉取所有实体并构建关系。
*/
@PostMapping("/full")
public List<SyncResultVO> syncAll(
public SyncMetadataVO syncAll(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
List<SyncResult> results = syncService.syncAll(graphId);
return results.stream().map(SyncResultVO::from).toList();
SyncMetadata metadata = syncService.syncAll(graphId);
return SyncMetadataVO.from(metadata);
}
/**
@@ -211,4 +218,50 @@ public class GraphSyncController {
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.buildSourcedFromRelations(graphId));
}
// -----------------------------------------------------------------------
// 同步历史查询端点
// -----------------------------------------------------------------------
/**
* 查询同步历史记录。
*
* @param status 可选,按状态过滤(SUCCESS / FAILED / PARTIAL)
* @param limit 返回条数上限,默认 20
*/
@GetMapping("/history")
public List<SyncMetadataVO> getSyncHistory(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@RequestParam(required = false) String status,
@RequestParam(defaultValue = "20") @Min(1) @Max(200) int limit) {
List<SyncMetadata> history = syncService.getSyncHistory(graphId, status, limit);
return history.stream().map(SyncMetadataVO::from).toList();
}
/**
* 按时间范围查询同步历史。
*/
@GetMapping("/history/range")
public List<SyncMetadataVO> getSyncHistoryByTimeRange(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime from,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime to,
@RequestParam(defaultValue = "0") @Min(0) @Max(10000) int page,
@RequestParam(defaultValue = "20") @Min(1) @Max(200) int size) {
List<SyncMetadata> history = syncService.getSyncHistoryByTimeRange(graphId, from, to, page, size);
return history.stream().map(SyncMetadataVO::from).toList();
}
/**
* 根据 syncId 查询单条同步记录。
*/
@GetMapping("/history/{syncId}")
public ResponseEntity<SyncMetadataVO> getSyncRecord(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId,
@PathVariable String syncId) {
return syncService.getSyncRecord(graphId, syncId)
.map(SyncMetadataVO::from)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
}

View File

@@ -1,7 +1,9 @@
package com.datamate.knowledgegraph.application;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.knowledgegraph.domain.model.SyncMetadata;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.domain.repository.SyncHistoryRepository;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.WorkflowDTO;
@@ -13,12 +15,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -42,6 +47,9 @@ class GraphSyncServiceTest {
@Mock
private KnowledgeGraphProperties properties;
@Mock
private SyncHistoryRepository syncHistoryRepository;
@InjectMocks
private GraphSyncService syncService;
@@ -161,7 +169,7 @@ class GraphSyncServiceTest {
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
List<SyncResult> results = syncService.syncAll(GRAPH_ID);
List<SyncResult> results = syncService.syncAll(GRAPH_ID).getResults();
// 8 entities + 10 relations = 18
assertThat(results).hasSize(18);
@@ -335,4 +343,324 @@ class GraphSyncServiceTest {
.isInstanceOf(BusinessException.class);
}
}
// -----------------------------------------------------------------------
// 同步元数据记录
// -----------------------------------------------------------------------
@Nested
class SyncMetadataRecordingTest {
@Test
void syncAll_success_recordsMetadataWithCorrectFields() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
dto.setCreatedBy("admin");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").created(3).updated(1).build());
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Field").build());
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
.thenReturn(SyncResult.builder().syncType("User").build());
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("Org").build());
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
.thenReturn(0);
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
SyncMetadata metadata = syncService.syncAll(GRAPH_ID);
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(metadata.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL);
assertThat(metadata.getGraphId()).isEqualTo(GRAPH_ID);
assertThat(metadata.getSyncId()).isNotNull();
assertThat(metadata.getStartedAt()).isNotNull();
assertThat(metadata.getCompletedAt()).isNotNull();
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
assertThat(metadata.getTotalCreated()).isEqualTo(3);
assertThat(metadata.getTotalUpdated()).isEqualTo(1);
assertThat(metadata.getResults()).hasSize(18);
assertThat(metadata.getStepSummaries()).hasSize(18);
assertThat(metadata.getErrorMessage()).isNull();
// 验证持久化被调用
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID);
}
@Test
void syncAll_withFailedSteps_recordsPartialStatus() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
dto.setCreatedBy("admin");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
// Dataset step has failures
SyncResult datasetResult = SyncResult.builder().syncType("Dataset").created(2).failed(1).build();
datasetResult.setErrors(new java.util.ArrayList<>(List.of("some error")));
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(datasetResult);
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Field").build());
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
.thenReturn(SyncResult.builder().syncType("User").build());
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("Org").build());
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Workflow").build());
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Job").build());
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
.thenReturn(0);
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
SyncMetadata metadata = syncService.syncAll(GRAPH_ID);
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL);
assertThat(metadata.getTotalFailed()).isEqualTo(1);
assertThat(metadata.getTotalCreated()).isEqualTo(2);
}
@Test
void syncAll_exceptionThrown_recordsFailedMetadata() {
when(properties.getSync()).thenReturn(syncConfig);
when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("connection refused"));
assertThatThrownBy(() -> syncService.syncAll(GRAPH_ID))
.isInstanceOf(BusinessException.class);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(saved.getErrorMessage()).isNotNull();
assertThat(saved.getGraphId()).isEqualTo(GRAPH_ID);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_FULL);
}
@Test
void syncDatasets_success_recordsMetadata() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").created(1).build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString()))
.thenReturn(0);
syncService.syncDatasets(GRAPH_ID);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS);
assertThat(saved.getTotalCreated()).isEqualTo(1);
}
@Test
void syncDatasets_failed_recordsFailedMetadata() {
when(properties.getSync()).thenReturn(syncConfig);
when(dataManagementClient.listAllDatasets()).thenThrow(new RuntimeException("timeout"));
assertThatThrownBy(() -> syncService.syncDatasets(GRAPH_ID))
.isInstanceOf(BusinessException.class);
ArgumentCaptor<SyncMetadata> captor = ArgumentCaptor.forClass(SyncMetadata.class);
verify(syncHistoryRepository).save(captor.capture());
SyncMetadata saved = captor.getValue();
assertThat(saved.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(saved.getSyncType()).isEqualTo(SyncMetadata.TYPE_DATASETS);
}
@Test
void saveSyncHistory_exceptionInSave_doesNotAffectMainFlow() {
when(properties.getSync()).thenReturn(syncConfig);
DatasetDTO dto = new DatasetDTO();
dto.setId("ds-001");
dto.setName("Test");
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
.thenReturn(SyncResult.builder().syncType("Dataset").build());
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Dataset"), anySet(), anyString()))
.thenReturn(0);
// saveSyncHistory 内部异常不应影响主流程
when(syncHistoryRepository.save(any())).thenThrow(new RuntimeException("Neo4j down"));
SyncResult result = syncService.syncDatasets(GRAPH_ID);
assertThat(result.getSyncType()).isEqualTo("Dataset");
}
}
// -----------------------------------------------------------------------
// 同步历史查询
// -----------------------------------------------------------------------
@Nested
class SyncHistoryQueryTest {
@Test
void getSyncHistory_invalidGraphId_throwsBusinessException() {
assertThatThrownBy(() -> syncService.getSyncHistory(INVALID_GRAPH_ID, null, 20))
.isInstanceOf(BusinessException.class);
}
@Test
void getSyncHistory_noStatusFilter_callsFindByGraphId() {
when(syncHistoryRepository.findByGraphId(GRAPH_ID, 20)).thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistory(GRAPH_ID, null, 20);
assertThat(result).isEmpty();
verify(syncHistoryRepository).findByGraphId(GRAPH_ID, 20);
verify(syncHistoryRepository, never()).findByGraphIdAndStatus(anyString(), anyString(), anyInt());
}
@Test
void getSyncHistory_withStatusFilter_callsFindByGraphIdAndStatus() {
when(syncHistoryRepository.findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10))
.thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistory(GRAPH_ID, "SUCCESS", 10);
assertThat(result).isEmpty();
verify(syncHistoryRepository).findByGraphIdAndStatus(GRAPH_ID, "SUCCESS", 10);
}
@Test
void getSyncRecord_found_returnsRecord() {
SyncMetadata expected = SyncMetadata.builder()
.syncId("abc12345").graphId(GRAPH_ID).build();
when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "abc12345"))
.thenReturn(Optional.of(expected));
Optional<SyncMetadata> result = syncService.getSyncRecord(GRAPH_ID, "abc12345");
assertThat(result).isPresent();
assertThat(result.get().getSyncId()).isEqualTo("abc12345");
}
@Test
void getSyncRecord_notFound_returnsEmpty() {
when(syncHistoryRepository.findByGraphIdAndSyncId(GRAPH_ID, "notexist"))
.thenReturn(Optional.empty());
Optional<SyncMetadata> result = syncService.getSyncRecord(GRAPH_ID, "notexist");
assertThat(result).isEmpty();
}
@Test
void getSyncHistoryByTimeRange_delegatesToRepository() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59);
when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20))
.thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 0, 20);
assertThat(result).isEmpty();
verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 0L, 20);
}
@Test
void getSyncHistoryByTimeRange_pagination_computesSkipCorrectly() {
LocalDateTime from = LocalDateTime.of(2025, 1, 1, 0, 0);
LocalDateTime to = LocalDateTime.of(2025, 12, 31, 23, 59);
when(syncHistoryRepository.findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20))
.thenReturn(List.of());
List<SyncMetadata> result = syncService.getSyncHistoryByTimeRange(GRAPH_ID, from, to, 2, 20);
assertThat(result).isEmpty();
// page=2, size=20 → skip=40
verify(syncHistoryRepository).findByGraphIdAndTimeRange(GRAPH_ID, from, to, 40L, 20);
}
@Test
void getSyncHistoryByTimeRange_skipOverflow_throwsBusinessException() {
// 模拟绕过 Controller 校验直接调用 Service 的场景
assertThatThrownBy(() -> syncService.getSyncHistoryByTimeRange(
GRAPH_ID,
LocalDateTime.of(2025, 1, 1, 0, 0),
LocalDateTime.of(2025, 12, 31, 23, 59),
20000, 200))
.isInstanceOf(BusinessException.class)
.hasMessageContaining("分页偏移量");
}
}
}

View File

@@ -0,0 +1,96 @@
package com.datamate.knowledgegraph.domain.model;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
class SyncMetadataTest {
@Test
void fromResults_aggregatesCountsCorrectly() {
LocalDateTime startedAt = LocalDateTime.of(2025, 6, 1, 10, 0, 0);
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(1).purged(3).build();
SyncResult r2 = SyncResult.builder().syncType("Field").created(10).updated(0).skipped(2).build();
SyncResult r3 = SyncResult.builder().syncType("HAS_FIELD").created(8).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc123", "graph-id", "FULL", startedAt, List.of(r1, r2, r3));
assertThat(metadata.getSyncId()).isEqualTo("abc123");
assertThat(metadata.getGraphId()).isEqualTo("graph-id");
assertThat(metadata.getSyncType()).isEqualTo("FULL");
assertThat(metadata.getTotalCreated()).isEqualTo(23); // 5 + 10 + 8
assertThat(metadata.getTotalUpdated()).isEqualTo(2); // 2 + 0 + 0
assertThat(metadata.getTotalSkipped()).isEqualTo(2); // 0 + 2 + 0
assertThat(metadata.getTotalFailed()).isEqualTo(1); // 1 + 0 + 0
assertThat(metadata.getTotalPurged()).isEqualTo(3); // 3 + 0 + 0
assertThat(metadata.getStartedAt()).isEqualTo(startedAt);
assertThat(metadata.getCompletedAt()).isNotNull();
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
assertThat(metadata.getResults()).hasSize(3);
assertThat(metadata.getStepSummaries()).hasSize(3);
}
@Test
void fromResults_noFailures_statusIsSuccess() {
LocalDateTime startedAt = LocalDateTime.now();
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1));
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_SUCCESS);
}
@Test
void fromResults_withFailures_statusIsPartial() {
LocalDateTime startedAt = LocalDateTime.now();
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).failed(2).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1));
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_PARTIAL);
assertThat(metadata.getTotalFailed()).isEqualTo(2);
}
@Test
void failed_createsFailedMetadata() {
LocalDateTime startedAt = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
SyncMetadata metadata = SyncMetadata.failed("abc", "g1", "FULL", startedAt, "connection refused");
assertThat(metadata.getStatus()).isEqualTo(SyncMetadata.STATUS_FAILED);
assertThat(metadata.getErrorMessage()).isEqualTo("connection refused");
assertThat(metadata.getSyncId()).isEqualTo("abc");
assertThat(metadata.getGraphId()).isEqualTo("g1");
assertThat(metadata.getSyncType()).isEqualTo("FULL");
assertThat(metadata.getStartedAt()).isEqualTo(startedAt);
assertThat(metadata.getCompletedAt()).isNotNull();
assertThat(metadata.getDurationMillis()).isGreaterThanOrEqualTo(0);
assertThat(metadata.getTotalCreated()).isEqualTo(0);
assertThat(metadata.getTotalUpdated()).isEqualTo(0);
}
@Test
void totalEntities_returnsSum() {
SyncMetadata metadata = SyncMetadata.builder()
.totalCreated(10).totalUpdated(5).totalSkipped(3).totalFailed(2)
.build();
assertThat(metadata.totalEntities()).isEqualTo(20);
}
@Test
void stepSummaries_formatIncludesPurgedWhenNonZero() {
LocalDateTime startedAt = LocalDateTime.now();
SyncResult r1 = SyncResult.builder().syncType("Dataset").created(5).updated(2).failed(0).purged(3).build();
SyncResult r2 = SyncResult.builder().syncType("Field").created(1).updated(0).failed(0).purged(0).build();
SyncMetadata metadata = SyncMetadata.fromResults("abc", "g1", "FULL", startedAt, List.of(r1, r2));
assertThat(metadata.getStepSummaries().get(0)).isEqualTo("Dataset(+5/~2/-0/purged:3)");
assertThat(metadata.getStepSummaries().get(1)).isEqualTo("Field(+1/~0/-0)");
}
}