feat(kg-sync): 实现图谱构建流程(MySQL → Neo4j 同步)

实现功能:
- 实现 GraphSyncService(同步编排器)
- 实现 GraphSyncStepService(同步步骤执行器)
- 实现 GraphSyncController(同步 API)
- 实现 GraphInitializer(图谱初始化)
- 实现 DataManagementClient(数据源客户端)

同步功能:
- syncDatasets:同步数据集实体
- syncFields:同步字段实体
- syncUsers:同步用户实体
- syncOrgs:同步组织实体
- buildHasFieldRelations:构建 HAS_FIELD 关系
- buildDerivedFromRelations:构建 DERIVED_FROM 关系
- buildBelongsToRelations:构建 BELONGS_TO 关系
- syncAll:全量同步(实体 + 关系 + 对账删除)

API 端点:
- POST /{graphId}/sync/full:全量同步
- POST /{graphId}/sync/datasets:同步数据集
- POST /{graphId}/sync/fields:同步字段
- POST /{graphId}/sync/users:同步用户
- POST /{graphId}/sync/orgs:同步组织
- POST /{graphId}/sync/relations/has-field:构建 HAS_FIELD
- POST /{graphId}/sync/relations/derived-from:构建 DERIVED_FROM
- POST /{graphId}/sync/relations/belongs-to:构建 BELONGS_TO

技术实现:
- Upsert 策略:
  - 实体:两阶段(Cypher MERGE 原子创建 + SDN save 更新扩展属性)
  - 关系:Cypher MERGE 幂等创建
- 全量对账删除:purgeStaleEntities() 删除 MySQL 中已删除的实体
- 并发安全:
  - 图级互斥锁(ConcurrentHashMap<String, ReentrantLock>)
  - 复合唯一约束(graph_id, source_id, type)
  - 锁自动回收(releaseLock() 原子检查并移除空闲锁)
- 重试机制:HTTP 调用失败时按指数退避重试(默认 3 次)
- 错误处理:
  - 逐条错误处理(单条失败不影响其他记录)
  - 统一异常包装(BusinessException.of(SYNC_FAILED))
  - 错误信息脱敏(仅返回 errorCount + syncId)
- 事务管理:
  - GraphSyncService(编排器,无事务)
  - GraphSyncStepService(步骤执行器,@Transactional)
- 性能优化:
  - 全量同步共享数据快照
  - 批量日志跟踪
- 图谱初始化:
  - 1 个唯一性约束(entity ID)
  - 1 个复合唯一约束(graph_id, source_id, type)
  - 9 个索引(5 个单字段 + 3 个复合 + 1 个全文)
  - 幂等性保证(IF NOT EXISTS)

代码审查:
- 经过 3 轮 Codex 审查和 2 轮 Claude 修复
- 所有问题已解决(3个P0 + 5个P1 + 3个P2 + 1个P3)
- 编译验证通过(mvn compile SUCCESS)

设计决策:
- 最终一致性:允许短暂的数据不一致
- 对账机制:定期对比并修复差异
- 信任边界:网关负责鉴权,服务层只做格式校验
- 多实例部署:依赖复合唯一约束兜底
This commit is contained in:
2026-02-17 23:46:03 +08:00
parent 910251e898
commit 8b1ab8ff36
12 changed files with 1261 additions and 1 deletions

View File

@@ -1,11 +1,28 @@
package com.datamate.knowledgegraph;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.neo4j.repository.config.EnableNeo4jRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.client.RestTemplate;
import java.time.Duration;
@Configuration
@ComponentScan(basePackages = "com.datamate.knowledgegraph")
@EnableNeo4jRepositories(basePackages = "com.datamate.knowledgegraph.domain.repository")
@EnableScheduling
public class KnowledgeGraphServiceConfiguration {
@Bean("kgRestTemplate")
public RestTemplate kgRestTemplate(RestTemplateBuilder builder, KnowledgeGraphProperties properties) {
KnowledgeGraphProperties.Sync syncConfig = properties.getSync();
return builder
.connectTimeout(Duration.ofMillis(syncConfig.getConnectTimeout()))
.readTimeout(Duration.ofMillis(syncConfig.getReadTimeout()))
.build();
}
}

View File

@@ -0,0 +1,335 @@
package com.datamate.knowledgegraph.application;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* 知识图谱数据同步编排器(无 {@code @Transactional})。
* <p>
* 负责拉取数据、编排同步步骤、管理并发锁,具体写操作委托给
* {@link GraphSyncStepService}(事务边界)。
* <p>
* <b>并发控制</b>:同一 graphId 的同步操作通过 {@link ReentrantLock} 互斥,
* 防止并发写入导致数据不一致。
* <p>
* <b>数据快照</b>:全量同步时只拉取一次数据集列表,
* 在各步骤间共享,避免重复 HTTP 调用。
* <p>
* <b>多实例部署</b>:当前图级锁为进程内 {@link ReentrantLock},
* 多实例部署时依赖 Neo4j 复合唯一约束 (graph_id, source_id, type)
* 兜底防止重复写入。如需严格互斥,可替换为 Redis/DB 分布式锁。
* <p>
* <b>信任边界</b>:本服务仅通过内网被 API Gateway / 定时任务调用,
* 网关层已完成用户身份认证与权限校验。
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class GraphSyncService {
private static final Pattern UUID_PATTERN = Pattern.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
);
private final GraphSyncStepService stepService;
private final DataManagementClient dataManagementClient;
private final KnowledgeGraphProperties properties;
/** 同 graphId 互斥锁,防止并发同步。 */
private final ConcurrentHashMap<String, ReentrantLock> graphLocks = new ConcurrentHashMap<>();
// -----------------------------------------------------------------------
// 全量同步
// -----------------------------------------------------------------------
public List<SyncResult> syncAll(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
log.info("[{}] Starting full sync for graphId={}", syncId, graphId);
// 一次拉取,全程共享
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
List<SyncResult> results = new ArrayList<>();
// 实体同步
results.add(stepService.upsertDatasetEntities(graphId, datasets, syncId));
results.add(stepService.upsertFieldEntities(graphId, datasets, syncId));
Set<String> usernames = extractUsernames(datasets);
results.add(stepService.upsertUserEntities(graphId, usernames, syncId));
results.add(stepService.upsertOrgEntities(graphId, syncId));
// 全量对账:删除 MySQL 已移除的记录,并回填 purged 到对应 SyncResult
Set<String> activeDatasetIds = datasets.stream()
.map(DatasetDTO::getId)
.collect(Collectors.toSet());
results.get(0).setPurged(
stepService.purgeStaleEntities(graphId, "Dataset", activeDatasetIds, syncId));
Set<String> activeFieldIds = new HashSet<>();
for (DatasetDTO dto : datasets) {
if (dto.getTags() != null) {
for (DataManagementClient.TagDTO tag : dto.getTags()) {
activeFieldIds.add(dto.getId() + ":tag:" + tag.getName());
}
}
}
results.get(1).setPurged(
stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId));
Set<String> activeUserIds = usernames.stream()
.map(u -> "user:" + u)
.collect(Collectors.toSet());
results.get(2).setPurged(
stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId));
// 关系构建(MERGE 幂等)
results.add(stepService.mergeHasFieldRelations(graphId, syncId));
results.add(stepService.mergeDerivedFromRelations(graphId, syncId));
results.add(stepService.mergeBelongsToRelations(graphId, syncId));
log.info("[{}] Full sync completed for graphId={}. Summary: {}", syncId, graphId,
results.stream()
.map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")")
.collect(Collectors.joining(", ")));
return results;
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] Full sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "全量同步失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
// -----------------------------------------------------------------------
// 单步同步(各自获取锁和数据)
// -----------------------------------------------------------------------
public SyncResult syncDatasets(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
SyncResult result = stepService.upsertDatasetEntities(graphId, datasets, syncId);
Set<String> activeIds = datasets.stream().map(DatasetDTO::getId).collect(Collectors.toSet());
int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId);
result.setPurged(purged);
return result;
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] Dataset sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "数据集同步失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
public SyncResult syncFields(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
SyncResult result = stepService.upsertFieldEntities(graphId, datasets, syncId);
Set<String> activeFieldIds = new HashSet<>();
for (DatasetDTO dto : datasets) {
if (dto.getTags() != null) {
for (DataManagementClient.TagDTO tag : dto.getTags()) {
activeFieldIds.add(dto.getId() + ":tag:" + tag.getName());
}
}
}
result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId));
return result;
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] Field sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "字段同步失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
public SyncResult syncUsers(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
List<DatasetDTO> datasets = fetchDatasetsWithRetry(syncId);
Set<String> usernames = extractUsernames(datasets);
SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId);
Set<String> activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet());
result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId));
return result;
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] User sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "用户同步失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
public SyncResult syncOrgs(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.upsertOrgEntities(graphId, syncId);
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] Org sync failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "组织同步失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
public SyncResult buildHasFieldRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeHasFieldRelations(graphId, syncId);
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] HAS_FIELD relation build failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED,
"HAS_FIELD 关系构建失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
public SyncResult buildDerivedFromRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeDerivedFromRelations(graphId, syncId);
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] DERIVED_FROM relation build failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED,
"DERIVED_FROM 关系构建失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
public SyncResult buildBelongsToRelations(String graphId) {
validateGraphId(graphId);
String syncId = UUID.randomUUID().toString().substring(0, 8);
ReentrantLock lock = acquireLock(graphId, syncId);
try {
return stepService.mergeBelongsToRelations(graphId, syncId);
} catch (BusinessException e) {
throw e;
} catch (Exception e) {
log.error("[{}] BELONGS_TO relation build failed for graphId={}", syncId, graphId, e);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED,
"BELONGS_TO 关系构建失败,syncId=" + syncId);
} finally {
releaseLock(graphId, lock);
}
}
// -----------------------------------------------------------------------
// 内部方法
// -----------------------------------------------------------------------
private ReentrantLock acquireLock(String graphId, String syncId) {
ReentrantLock lock = graphLocks.computeIfAbsent(graphId, k -> new ReentrantLock());
if (!lock.tryLock()) {
log.warn("[{}] Graph {} is already being synced, rejecting concurrent request", syncId, graphId);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "该图谱正在同步中,请稍后重试");
}
return lock;
}
/**
* 释放锁并在无竞争时清理锁对象,防止 graphLocks 无限增长。
*/
private void releaseLock(String graphId, ReentrantLock lock) {
lock.unlock();
graphLocks.compute(graphId, (key, existing) -> {
// 仅当锁空闲且无等待线程时移除,compute 保证此 key 的原子性
if (existing != null && !existing.isLocked() && !existing.hasQueuedThreads()) {
return null;
}
return existing;
});
}
private List<DatasetDTO> fetchDatasetsWithRetry(String syncId) {
int maxRetries = properties.getSync().getMaxRetries();
long retryInterval = properties.getSync().getRetryInterval();
Exception lastException = null;
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
return dataManagementClient.listAllDatasets();
} catch (Exception e) {
lastException = e;
log.warn("[{}] Dataset fetch attempt {}/{} failed: {}", syncId, attempt, maxRetries, e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(retryInterval * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "同步被中断");
}
}
}
}
log.error("[{}] All {} fetch attempts failed", syncId, maxRetries, lastException);
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED,
"拉取数据集失败(已重试 " + maxRetries + " 次),syncId=" + syncId);
}
private static Set<String> extractUsernames(List<DatasetDTO> datasets) {
Set<String> usernames = new LinkedHashSet<>();
for (DatasetDTO dto : datasets) {
if (dto.getCreatedBy() != null && !dto.getCreatedBy().isBlank()) {
usernames.add(dto.getCreatedBy());
}
if (dto.getUpdatedBy() != null && !dto.getUpdatedBy().isBlank()) {
usernames.add(dto.getUpdatedBy());
}
}
return usernames;
}
private void validateGraphId(String graphId) {
if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效");
}
}
}

View File

@@ -0,0 +1,407 @@
package com.datamate.knowledgegraph.application;
import com.datamate.knowledgegraph.domain.model.GraphEntity;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.domain.repository.GraphEntityRepository;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO;
import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.TagDTO;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.neo4j.core.Neo4jClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
/**
* 同步步骤执行器(事务边界)。
* <p>
* 所有写操作在独立 {@code @Transactional} 方法中执行,
* 由 {@link GraphSyncService} 编排调用,避免自调用导致事务失效。
* <p>
* 关系构建使用 Cypher MERGE 保证幂等性,
* 实体 upsert 使用 Cypher MERGE 基于 (graph_id, source_id, type) 复合约束原子操作,
* 扩展属性通过 SDN 更新。
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class GraphSyncStepService {
private static final String SOURCE_TYPE_SYNC = "SYNC";
private static final String REL_TYPE = "RELATED_TO";
private final GraphEntityRepository entityRepository;
private final Neo4jClient neo4jClient;
private final KnowledgeGraphProperties properties;
// -----------------------------------------------------------------------
// 实体 upsert
// -----------------------------------------------------------------------
@Transactional
public SyncResult upsertDatasetEntities(String graphId, List<DatasetDTO> datasets, String syncId) {
SyncResult result = beginResult("Dataset", syncId);
int batchSize = properties.getImportBatchSize();
for (int i = 0; i < datasets.size(); i++) {
DatasetDTO dto = datasets.get(i);
try {
Map<String, Object> props = new HashMap<>();
props.put("dataset_type", dto.getDatasetType());
props.put("status", dto.getStatus());
props.put("total_size", dto.getTotalSize());
props.put("file_count", dto.getFileCount());
if (dto.getParentDatasetId() != null) {
props.put("parent_dataset_id", dto.getParentDatasetId());
}
if (dto.getTags() != null) {
List<String> tagNames = dto.getTags().stream()
.map(TagDTO::getName).toList();
props.put("tags", tagNames);
}
upsertEntity(graphId, dto.getId(), "Dataset",
dto.getName(), dto.getDescription(), props, result);
if ((i + 1) % batchSize == 0) {
log.debug("[{}] Processed {}/{} datasets", syncId, i + 1, datasets.size());
}
} catch (Exception e) {
log.warn("[{}] Failed to upsert dataset: sourceId={}", syncId, dto.getId(), e);
result.addError("dataset:" + dto.getId());
}
}
return endResult(result);
}
@Transactional
public SyncResult upsertFieldEntities(String graphId, List<DatasetDTO> datasets, String syncId) {
SyncResult result = beginResult("Field", syncId);
for (DatasetDTO dto : datasets) {
if (dto.getTags() == null || dto.getTags().isEmpty()) {
continue;
}
for (TagDTO tag : dto.getTags()) {
try {
String fieldSourceId = dto.getId() + ":tag:" + tag.getName();
Map<String, Object> props = new HashMap<>();
props.put("data_type", "TAG");
props.put("dataset_source_id", dto.getId());
if (tag.getColor() != null) {
props.put("color", tag.getColor());
}
upsertEntity(graphId, fieldSourceId, "Field", tag.getName(),
"数据集[" + dto.getName() + "]的标签字段", props, result);
} catch (Exception e) {
log.warn("[{}] Failed to upsert field: dataset={}, tag={}",
syncId, dto.getId(), tag.getName(), e);
result.addError("field:" + dto.getId() + ":" + tag.getName());
}
}
}
return endResult(result);
}
@Transactional
public SyncResult upsertUserEntities(String graphId, Set<String> usernames, String syncId) {
SyncResult result = beginResult("User", syncId);
for (String username : usernames) {
try {
Map<String, Object> props = new HashMap<>();
props.put("username", username);
upsertEntity(graphId, "user:" + username, "User", username, null, props, result);
} catch (Exception e) {
log.warn("[{}] Failed to upsert user: username={}", syncId, username, e);
result.addError("user:" + username);
}
}
return endResult(result);
}
@Transactional
public SyncResult upsertOrgEntities(String graphId, String syncId) {
SyncResult result = beginResult("Org", syncId);
try {
Map<String, Object> props = new HashMap<>();
props.put("org_code", "DEFAULT");
props.put("level", 1);
upsertEntity(graphId, "org:default", "Org", "默认组织",
"系统默认组织(待对接组织服务后更新)", props, result);
} catch (Exception e) {
log.warn("[{}] Failed to upsert default org", syncId, e);
result.addError("org:default");
}
return endResult(result);
}
// -----------------------------------------------------------------------
// 全量对账删除
// -----------------------------------------------------------------------
/**
* 删除 Neo4j 中 source_type=SYNC 但 source_id 不在活跃集合中的实体。
* 使用 DETACH DELETE 同时清理关联关系。
* <p>
* 当 activeSourceIds 为空时,说明数据源中该类型实体已全部移除,
* 将清除图中所有对应的 SYNC 实体。调用方需确保空集是数据拉取成功后的
* 真实结果(fetchDatasetsWithRetry 失败时会抛异常,不会到达此处)。
*/
@Transactional
public int purgeStaleEntities(String graphId, String type, Set<String> activeSourceIds, String syncId) {
String cypher;
Map<String, Object> params;
if (activeSourceIds.isEmpty()) {
log.warn("[{}] Active source IDs empty for type={}, purging ALL SYNC entities of this type",
syncId, type);
cypher = "MATCH (e:Entity {graph_id: $graphId, type: $type, source_type: 'SYNC'}) " +
"DETACH DELETE e " +
"RETURN count(*) AS deleted";
params = Map.of("graphId", graphId, "type", type);
} else {
cypher = "MATCH (e:Entity {graph_id: $graphId, type: $type, source_type: 'SYNC'}) " +
"WHERE NOT e.source_id IN $activeSourceIds " +
"DETACH DELETE e " +
"RETURN count(*) AS deleted";
params = Map.of(
"graphId", graphId,
"type", type,
"activeSourceIds", new ArrayList<>(activeSourceIds)
);
}
long deleted = neo4jClient.query(cypher)
.bindAll(params)
.fetchAs(Long.class)
.mappedBy((ts, record) -> record.get("deleted").asLong())
.one()
.orElse(0L);
if (deleted > 0) {
log.info("[{}] Purged {} stale {} entities from graphId={}", syncId, deleted, type, graphId);
}
return (int) deleted;
}
// -----------------------------------------------------------------------
// 关系构建(MERGE 保证幂等)
// -----------------------------------------------------------------------
@Transactional
public SyncResult mergeHasFieldRelations(String graphId, String syncId) {
SyncResult result = beginResult("HAS_FIELD", syncId);
List<GraphEntity> fields = entityRepository.findByGraphIdAndType(graphId, "Field");
for (GraphEntity field : fields) {
try {
Object datasetSourceId = field.getProperties().get("dataset_source_id");
if (datasetSourceId == null) {
result.incrementSkipped();
continue;
}
Optional<GraphEntity> datasetOpt = entityRepository.findByGraphIdAndSourceIdAndType(
graphId, datasetSourceId.toString(), "Dataset");
if (datasetOpt.isEmpty()) {
result.incrementSkipped();
continue;
}
boolean created = mergeRelation(graphId, datasetOpt.get().getId(), field.getId(),
"HAS_FIELD", "{}", syncId);
if (created) {
result.incrementCreated();
} else {
result.incrementSkipped();
}
} catch (Exception e) {
log.warn("[{}] Failed to merge HAS_FIELD for field: id={}", syncId, field.getId(), e);
result.addError("has_field:" + field.getId());
}
}
return endResult(result);
}
@Transactional
public SyncResult mergeDerivedFromRelations(String graphId, String syncId) {
SyncResult result = beginResult("DERIVED_FROM", syncId);
List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
for (GraphEntity dataset : datasets) {
try {
Object parentId = dataset.getProperties().get("parent_dataset_id");
if (parentId == null || parentId.toString().isBlank()) {
continue;
}
Optional<GraphEntity> parentOpt = entityRepository.findByGraphIdAndSourceIdAndType(
graphId, parentId.toString(), "Dataset");
if (parentOpt.isEmpty()) {
result.incrementSkipped();
continue;
}
boolean created = mergeRelation(graphId, dataset.getId(), parentOpt.get().getId(),
"DERIVED_FROM", "{\"derivation_type\":\"VERSION\"}", syncId);
if (created) {
result.incrementCreated();
} else {
result.incrementSkipped();
}
} catch (Exception e) {
log.warn("[{}] Failed to merge DERIVED_FROM for dataset: id={}", syncId, dataset.getId(), e);
result.addError("derived_from:" + dataset.getId());
}
}
return endResult(result);
}
@Transactional
public SyncResult mergeBelongsToRelations(String graphId, String syncId) {
SyncResult result = beginResult("BELONGS_TO", syncId);
Optional<GraphEntity> defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType(
graphId, "org:default", "Org");
if (defaultOrgOpt.isEmpty()) {
log.warn("[{}] Default org not found, skipping BELONGS_TO", syncId);
result.addError("belongs_to:org_missing");
return endResult(result);
}
String orgId = defaultOrgOpt.get().getId();
// User → Org
for (GraphEntity user : entityRepository.findByGraphIdAndType(graphId, "User")) {
try {
boolean created = mergeRelation(graphId, user.getId(), orgId,
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
if (created) { result.incrementCreated(); } else { result.incrementSkipped(); }
} catch (Exception e) {
log.warn("[{}] Failed to merge BELONGS_TO for user: id={}", syncId, user.getId(), e);
result.addError("belongs_to:user:" + user.getId());
}
}
// Dataset → Org
for (GraphEntity dataset : entityRepository.findByGraphIdAndType(graphId, "Dataset")) {
try {
boolean created = mergeRelation(graphId, dataset.getId(), orgId,
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
if (created) { result.incrementCreated(); } else { result.incrementSkipped(); }
} catch (Exception e) {
log.warn("[{}] Failed to merge BELONGS_TO for dataset: id={}", syncId, dataset.getId(), e);
result.addError("belongs_to:dataset:" + dataset.getId());
}
}
return endResult(result);
}
// -----------------------------------------------------------------------
// 内部方法
// -----------------------------------------------------------------------
/**
* 使用 Cypher MERGE 原子创建或匹配实体,再通过 SDN 更新扩展属性。
* <p>
* MERGE 基于 (graph_id, source_id, type) 复合唯一约束,
* 保证多实例并发写入时不会产生重复节点。
*/
private void upsertEntity(String graphId, String sourceId, String type,
String name, String description,
Map<String, Object> props, SyncResult result) {
String newId = UUID.randomUUID().toString();
Map<String, Object> params = new HashMap<>();
params.put("graphId", graphId);
params.put("sourceId", sourceId);
params.put("type", type);
params.put("newId", newId);
params.put("name", name != null ? name : "");
params.put("description", description != null ? description : "");
// Phase 1: Atomic MERGE — 创建或匹配实体骨架
Boolean isNew = neo4jClient.query(
"MERGE (e:Entity {graph_id: $graphId, source_id: $sourceId, type: $type}) " +
"ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " +
" e.name = $name, e.description = $description, " +
" e.created_at = datetime(), e.updated_at = datetime() " +
"ON MATCH SET e.name = $name, e.description = $description, " +
" e.updated_at = datetime() " +
"RETURN e.id = $newId AS isNew"
)
.bindAll(params)
.fetchAs(Boolean.class)
.mappedBy((ts, record) -> record.get("isNew").asBoolean())
.one()
.orElse(false);
// Phase 2: 通过 SDN 更新扩展属性(Map 序列化由 SDN 处理)
entityRepository.findByGraphIdAndSourceIdAndType(graphId, sourceId, type)
.ifPresent(entity -> {
entity.setProperties(props != null ? props : new HashMap<>());
entityRepository.save(entity);
});
if (isNew) {
result.incrementCreated();
} else {
result.incrementUpdated();
}
}
/**
* 使用 Cypher MERGE 创建或匹配关系,保证幂等性。
*
* @return true 如果是新创建的关系,false 如果已存在
*/
private boolean mergeRelation(String graphId, String sourceEntityId, String targetEntityId,
String relationType, String propertiesJson, String syncId) {
String newId = UUID.randomUUID().toString();
String mergedId = neo4jClient
.query(
"MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " +
"MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " +
"MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " +
"ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " +
" r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " +
"RETURN r.id AS relId"
)
.bindAll(Map.of(
"graphId", graphId,
"sourceEntityId", sourceEntityId,
"targetEntityId", targetEntityId,
"relationType", relationType,
"newId", newId,
"propertiesJson", propertiesJson
))
.fetchAs(String.class)
.mappedBy((ts, record) -> record.get("relId").asString())
.one()
.orElse(null);
return newId.equals(mergedId);
}
private SyncResult beginResult(String syncType, String syncId) {
return SyncResult.builder()
.syncType(syncType)
.syncId(syncId)
.startedAt(LocalDateTime.now())
.errors(new ArrayList<>())
.build();
}
private SyncResult endResult(SyncResult result) {
result.setCompletedAt(LocalDateTime.now());
return result;
}
}

View File

@@ -0,0 +1,77 @@
package com.datamate.knowledgegraph.domain.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 同步操作结果统计。
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SyncResult {
/** 本次同步的追踪标识 */
private String syncId;
/** 同步的实体/关系类型 */
private String syncType;
@Builder.Default
private int created = 0;
@Builder.Default
private int updated = 0;
@Builder.Default
private int skipped = 0;
@Builder.Default
private int failed = 0;
/** 全量对账删除的过期实体数 */
@Builder.Default
private int purged = 0;
@Builder.Default
private List<String> errors = new ArrayList<>();
private LocalDateTime startedAt;
private LocalDateTime completedAt;
public int total() {
return created + updated + skipped + failed;
}
public long durationMillis() {
if (startedAt == null || completedAt == null) {
return 0;
}
return java.time.Duration.between(startedAt, completedAt).toMillis();
}
public void incrementCreated() {
created++;
}
public void incrementUpdated() {
updated++;
}
public void incrementSkipped() {
skipped++;
}
public void addError(String error) {
failed++;
errors.add(error);
}
}

View File

@@ -41,4 +41,12 @@ public interface GraphEntityRepository extends Neo4jRepository<GraphEntity, Stri
@Query("MATCH (e:Entity {graph_id: $graphId}) RETURN count(e)")
long countByGraphId(@Param("graphId") String graphId);
@Query("MATCH (e:Entity {graph_id: $graphId}) " +
"WHERE e.source_id = $sourceId AND e.type = $type " +
"RETURN e")
Optional<GraphEntity> findByGraphIdAndSourceIdAndType(
@Param("graphId") String graphId,
@Param("sourceId") String sourceId,
@Param("type") String type);
}

View File

@@ -0,0 +1,126 @@
package com.datamate.knowledgegraph.infrastructure.client;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 数据管理服务 REST 客户端。
* <p>
* 通过 HTTP 调用 data-management-service 的 REST API,
* 拉取数据集、文件等元数据用于同步到 Neo4j。
*/
@Component
@Slf4j
public class DataManagementClient {
private final RestTemplate restTemplate;
private final String baseUrl;
private final int pageSize;
public DataManagementClient(
@Qualifier("kgRestTemplate") RestTemplate restTemplate,
KnowledgeGraphProperties properties) {
this.restTemplate = restTemplate;
this.baseUrl = properties.getSync().getDataManagementUrl();
this.pageSize = properties.getSync().getPageSize();
}
/**
* 拉取所有数据集(自动分页)。
*/
public List<DatasetDTO> listAllDatasets() {
List<DatasetDTO> allDatasets = new ArrayList<>();
int page = 0;
while (true) {
String url = baseUrl + "/data-management/datasets?page=" + page + "&size=" + pageSize;
log.debug("Fetching datasets: page={}, size={}", page, pageSize);
try {
ResponseEntity<PagedResult<DatasetDTO>> response = restTemplate.exchange(
url, HttpMethod.GET, null,
new ParameterizedTypeReference<>() {}
);
PagedResult<DatasetDTO> body = response.getBody();
if (body == null || body.getContent() == null || body.getContent().isEmpty()) {
break;
}
allDatasets.addAll(body.getContent());
log.debug("Fetched {} datasets (page {}), total so far: {}",
body.getContent().size(), page, allDatasets.size());
if (page >= body.getTotalPages() - 1) {
break;
}
page++;
} catch (RestClientException e) {
log.error("Failed to fetch datasets from data-management-service: page={}, url={}", page, url, e);
throw e;
}
}
log.info("Fetched {} datasets in total from data-management-service", allDatasets.size());
return allDatasets;
}
// -----------------------------------------------------------------------
// 响应 DTO(仅包含同步所需字段)
// -----------------------------------------------------------------------
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class PagedResult<T> {
private List<T> content;
private long page;
private long totalElements;
private long totalPages;
}
/**
* 与 data-management-service 的 DatasetResponse 对齐。
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class DatasetDTO {
private String id;
private String name;
private String description;
private String parentDatasetId;
private String datasetType;
private String status;
private Long totalSize;
private Integer fileCount;
private String createdBy;
private String updatedBy;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
private List<TagDTO> tags;
}
/**
* 与 data-management-service 的 TagResponse 对齐。
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class TagDTO {
private String id;
private String name;
private String color;
private String description;
}
}

View File

@@ -18,7 +18,8 @@ public enum KnowledgeGraphErrorCode implements ErrorCode {
INVALID_RELATION("knowledge_graph.0005", "无效的关系定义"),
IMPORT_FAILED("knowledge_graph.0006", "图谱导入失败"),
QUERY_DEPTH_EXCEEDED("knowledge_graph.0007", "查询深度超出限制"),
MAX_NODES_EXCEEDED("knowledge_graph.0008", "查询结果节点数超出限制");
MAX_NODES_EXCEEDED("knowledge_graph.0008", "查询结果节点数超出限制"),
SYNC_FAILED("knowledge_graph.0009", "数据同步失败");
private final String code;
private final String message;

View File

@@ -0,0 +1,90 @@
package com.datamate.knowledgegraph.infrastructure.neo4j;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.data.neo4j.core.Neo4jClient;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 图谱 Schema 初始化器。
* <p>
* 应用启动时自动创建 Neo4j 索引和约束。
* 所有语句使用 {@code IF NOT EXISTS},保证幂等性。
* <p>
* 对应 {@code docs/knowledge-graph/schema/schema.cypher} 中的第 1-3 部分。
*/
@Component
@Slf4j
@RequiredArgsConstructor
@Order(1)
public class GraphInitializer implements ApplicationRunner {
private final Neo4jClient neo4jClient;
private final KnowledgeGraphProperties properties;
/**
* 需要在启动时执行的 Cypher 语句。
* 每条语句必须独立执行(Neo4j 不支持多条 DDL 在同一事务中)。
*/
private static final List<String> SCHEMA_STATEMENTS = List.of(
// 约束(自动创建对应索引)
"CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE",
// 同步 upsert 复合唯一约束:防止并发写入产生重复实体
"CREATE CONSTRAINT entity_sync_unique IF NOT EXISTS " +
"FOR (n:Entity) REQUIRE (n.graph_id, n.source_id, n.type) IS UNIQUE",
// 单字段索引
"CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id)",
"CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)",
"CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)",
"CREATE INDEX entity_source_id IF NOT EXISTS FOR (n:Entity) ON (n.source_id)",
"CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at)",
// 复合索引
"CREATE INDEX entity_graph_id_type IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.type)",
"CREATE INDEX entity_graph_id_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.id)",
"CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)",
// 全文索引
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]"
);
@Override
public void run(ApplicationArguments args) {
if (!properties.getSync().isAutoInitSchema()) {
log.info("Schema auto-init is disabled, skipping");
return;
}
log.info("Initializing Neo4j schema: {} statements to execute", SCHEMA_STATEMENTS.size());
int succeeded = 0;
int failed = 0;
for (String statement : SCHEMA_STATEMENTS) {
try {
neo4jClient.query(statement).run();
succeeded++;
log.debug("Schema statement executed: {}", truncate(statement));
} catch (Exception e) {
failed++;
// 约束/索引可能已存在(不同于 IF NOT EXISTS 的实现版本),
// 记录警告但不中断启动。
log.warn("Schema statement failed (may already exist): {} — {}",
truncate(statement), e.getMessage());
}
}
log.info("Neo4j schema initialization completed: succeeded={}, failed={}", succeeded, failed);
}
private static String truncate(String s) {
return s.length() <= 100 ? s : s.substring(0, 97) + "...";
}
}

View File

@@ -17,4 +17,32 @@ public class KnowledgeGraphProperties {
/** 批量导入批次大小 */
private int importBatchSize = 100;
/** 同步相关配置 */
private Sync sync = new Sync();
@Data
public static class Sync {
/** 数据管理服务基础 URL */
private String dataManagementUrl = "http://localhost:8080";
/** 同步每页拉取数量 */
private int pageSize = 200;
/** HTTP 连接超时(毫秒) */
private int connectTimeout = 5000;
/** HTTP 读取超时(毫秒) */
private int readTimeout = 30000;
/** 失败时最大重试次数 */
private int maxRetries = 3;
/** 重试间隔(毫秒) */
private long retryInterval = 1000;
/** 是否在启动时自动初始化 Schema */
private boolean autoInitSchema = true;
}
}

View File

@@ -0,0 +1,53 @@
package com.datamate.knowledgegraph.interfaces.dto;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 同步结果视图对象。
* <p>
* 不暴露内部错误详情(errors 列表),仅返回错误计数和 syncId,
* 前端可通过 syncId 向运维查询具体日志。
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SyncResultVO {
private String syncId;
private String syncType;
private int created;
private int updated;
private int skipped;
private int failed;
private int purged;
private int total;
private long durationMillis;
/** 错误数量(不暴露具体错误信息) */
private int errorCount;
private LocalDateTime startedAt;
private LocalDateTime completedAt;
public static SyncResultVO from(SyncResult result) {
return SyncResultVO.builder()
.syncId(result.getSyncId())
.syncType(result.getSyncType())
.created(result.getCreated())
.updated(result.getUpdated())
.skipped(result.getSkipped())
.failed(result.getFailed())
.purged(result.getPurged())
.total(result.total())
.durationMillis(result.durationMillis())
.errorCount(result.getErrors() != null ? result.getErrors().size() : 0)
.startedAt(result.getStartedAt())
.completedAt(result.getCompletedAt())
.build();
}
}

View File

@@ -0,0 +1,102 @@
package com.datamate.knowledgegraph.interfaces.rest;
import com.datamate.knowledgegraph.application.GraphSyncService;
import com.datamate.knowledgegraph.domain.model.SyncResult;
import com.datamate.knowledgegraph.interfaces.dto.SyncResultVO;
import jakarta.validation.constraints.Pattern;
import lombok.RequiredArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* 知识图谱数据同步 API。
* <p>
* 提供手动触发 MySQL → Neo4j 同步的 REST 端点。
* 生产环境中也可通过定时任务自动触发。
*/
@RestController
@RequestMapping("/knowledge-graph/{graphId}/sync")
@RequiredArgsConstructor
@Validated
public class GraphSyncController {
private static final String UUID_REGEX =
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$";
private final GraphSyncService syncService;
/**
* 全量同步:拉取所有实体并构建关系。
*/
@PostMapping("/full")
public List<SyncResultVO> syncAll(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
List<SyncResult> results = syncService.syncAll(graphId);
return results.stream().map(SyncResultVO::from).toList();
}
/**
* 同步数据集实体。
*/
@PostMapping("/datasets")
public SyncResultVO syncDatasets(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.syncDatasets(graphId));
}
/**
* 同步字段实体。
*/
@PostMapping("/fields")
public SyncResultVO syncFields(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.syncFields(graphId));
}
/**
* 同步用户实体。
*/
@PostMapping("/users")
public SyncResultVO syncUsers(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.syncUsers(graphId));
}
/**
* 同步组织实体。
*/
@PostMapping("/orgs")
public SyncResultVO syncOrgs(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.syncOrgs(graphId));
}
/**
* 构建 HAS_FIELD 关系。
*/
@PostMapping("/relations/has-field")
public SyncResultVO buildHasFieldRelations(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.buildHasFieldRelations(graphId));
}
/**
* 构建 DERIVED_FROM 关系。
*/
@PostMapping("/relations/derived-from")
public SyncResultVO buildDerivedFromRelations(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.buildDerivedFromRelations(graphId));
}
/**
* 构建 BELONGS_TO 关系。
*/
@PostMapping("/relations/belongs-to")
public SyncResultVO buildBelongsToRelations(
@PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) {
return SyncResultVO.from(syncService.buildBelongsToRelations(graphId));
}
}

View File

@@ -23,3 +23,19 @@ datamate:
max-nodes-per-query: ${KG_MAX_NODES:500}
# 批量导入批次大小
import-batch-size: ${KG_IMPORT_BATCH_SIZE:100}
# MySQL → Neo4j 同步配置
sync:
# 数据管理服务地址
data-management-url: ${DATA_MANAGEMENT_URL:http://localhost:8080}
# 每页拉取数量
page-size: ${KG_SYNC_PAGE_SIZE:200}
# HTTP 连接超时(毫秒)
connect-timeout: ${KG_SYNC_CONNECT_TIMEOUT:5000}
# HTTP 读取超时(毫秒)
read-timeout: ${KG_SYNC_READ_TIMEOUT:30000}
# 失败时最大重试次数
max-retries: ${KG_SYNC_MAX_RETRIES:3}
# 重试间隔(毫秒)
retry-interval: ${KG_SYNC_RETRY_INTERVAL:1000}
# 是否在启动时自动初始化 Schema
auto-init-schema: ${KG_AUTO_INIT_SCHEMA:true}