diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/KnowledgeGraphServiceConfiguration.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/KnowledgeGraphServiceConfiguration.java index 107fb26..5b5adb0 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/KnowledgeGraphServiceConfiguration.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/KnowledgeGraphServiceConfiguration.java @@ -1,11 +1,28 @@ package com.datamate.knowledgegraph; +import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.data.neo4j.repository.config.EnableNeo4jRepositories; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.web.client.RestTemplate; + +import java.time.Duration; @Configuration @ComponentScan(basePackages = "com.datamate.knowledgegraph") @EnableNeo4jRepositories(basePackages = "com.datamate.knowledgegraph.domain.repository") +@EnableScheduling public class KnowledgeGraphServiceConfiguration { + + @Bean("kgRestTemplate") + public RestTemplate kgRestTemplate(RestTemplateBuilder builder, KnowledgeGraphProperties properties) { + KnowledgeGraphProperties.Sync syncConfig = properties.getSync(); + return builder + .connectTimeout(Duration.ofMillis(syncConfig.getConnectTimeout())) + .readTimeout(Duration.ofMillis(syncConfig.getReadTimeout())) + .build(); + } } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java new file mode 100644 index 0000000..c7c0ee7 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java @@ -0,0 +1,335 @@ +package com.datamate.knowledgegraph.application; + +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.knowledgegraph.domain.model.SyncResult; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; +import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode; +import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * 知识图谱数据同步编排器(无 {@code @Transactional})。 + *

+ * 负责拉取数据、编排同步步骤、管理并发锁,具体写操作委托给 + * {@link GraphSyncStepService}(事务边界)。 + *

+ * 并发控制:同一 graphId 的同步操作通过 {@link ReentrantLock} 互斥, + * 防止并发写入导致数据不一致。 + *

+ * 数据快照:全量同步时只拉取一次数据集列表, + * 在各步骤间共享,避免重复 HTTP 调用。 + *

+ * 多实例部署:当前图级锁为进程内 {@link ReentrantLock}, + * 多实例部署时依赖 Neo4j 复合唯一约束 (graph_id, source_id, type) + * 兜底防止重复写入。如需严格互斥,可替换为 Redis/DB 分布式锁。 + *

+ * 信任边界:本服务仅通过内网被 API Gateway / 定时任务调用, + * 网关层已完成用户身份认证与权限校验。 + */ +@Service +@Slf4j +@RequiredArgsConstructor +public class GraphSyncService { + + private static final Pattern UUID_PATTERN = Pattern.compile( + "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$" + ); + + private final GraphSyncStepService stepService; + private final DataManagementClient dataManagementClient; + private final KnowledgeGraphProperties properties; + + /** 同 graphId 互斥锁,防止并发同步。 */ + private final ConcurrentHashMap graphLocks = new ConcurrentHashMap<>(); + + // ----------------------------------------------------------------------- + // 全量同步 + // ----------------------------------------------------------------------- + + public List syncAll(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + + ReentrantLock lock = acquireLock(graphId, syncId); + try { + log.info("[{}] Starting full sync for graphId={}", syncId, graphId); + + // 一次拉取,全程共享 + List datasets = fetchDatasetsWithRetry(syncId); + + List results = new ArrayList<>(); + + // 实体同步 + results.add(stepService.upsertDatasetEntities(graphId, datasets, syncId)); + results.add(stepService.upsertFieldEntities(graphId, datasets, syncId)); + + Set usernames = extractUsernames(datasets); + results.add(stepService.upsertUserEntities(graphId, usernames, syncId)); + results.add(stepService.upsertOrgEntities(graphId, syncId)); + + // 全量对账:删除 MySQL 已移除的记录,并回填 purged 到对应 SyncResult + Set activeDatasetIds = datasets.stream() + .map(DatasetDTO::getId) + .collect(Collectors.toSet()); + results.get(0).setPurged( + stepService.purgeStaleEntities(graphId, "Dataset", activeDatasetIds, syncId)); + + Set activeFieldIds = new HashSet<>(); + for (DatasetDTO dto : datasets) { + if (dto.getTags() != null) { + for (DataManagementClient.TagDTO tag : dto.getTags()) { + activeFieldIds.add(dto.getId() + ":tag:" + tag.getName()); + } + } + } + results.get(1).setPurged( + stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId)); + + Set activeUserIds = usernames.stream() + .map(u -> "user:" + u) + .collect(Collectors.toSet()); + results.get(2).setPurged( + stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId)); + + // 关系构建(MERGE 幂等) + results.add(stepService.mergeHasFieldRelations(graphId, syncId)); + results.add(stepService.mergeDerivedFromRelations(graphId, syncId)); + results.add(stepService.mergeBelongsToRelations(graphId, syncId)); + + log.info("[{}] Full sync completed for graphId={}. Summary: {}", syncId, graphId, + results.stream() + .map(r -> r.getSyncType() + "(+" + r.getCreated() + "/~" + r.getUpdated() + "/-" + r.getFailed() + ")") + .collect(Collectors.joining(", "))); + return results; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] Full sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "全量同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + // ----------------------------------------------------------------------- + // 单步同步(各自获取锁和数据) + // ----------------------------------------------------------------------- + + public SyncResult syncDatasets(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + List datasets = fetchDatasetsWithRetry(syncId); + SyncResult result = stepService.upsertDatasetEntities(graphId, datasets, syncId); + Set activeIds = datasets.stream().map(DatasetDTO::getId).collect(Collectors.toSet()); + int purged = stepService.purgeStaleEntities(graphId, "Dataset", activeIds, syncId); + result.setPurged(purged); + return result; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] Dataset sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "数据集同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult syncFields(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + List datasets = fetchDatasetsWithRetry(syncId); + SyncResult result = stepService.upsertFieldEntities(graphId, datasets, syncId); + Set activeFieldIds = new HashSet<>(); + for (DatasetDTO dto : datasets) { + if (dto.getTags() != null) { + for (DataManagementClient.TagDTO tag : dto.getTags()) { + activeFieldIds.add(dto.getId() + ":tag:" + tag.getName()); + } + } + } + result.setPurged(stepService.purgeStaleEntities(graphId, "Field", activeFieldIds, syncId)); + return result; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] Field sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "字段同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult syncUsers(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + List datasets = fetchDatasetsWithRetry(syncId); + Set usernames = extractUsernames(datasets); + SyncResult result = stepService.upsertUserEntities(graphId, usernames, syncId); + Set activeUserIds = usernames.stream().map(u -> "user:" + u).collect(Collectors.toSet()); + result.setPurged(stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId)); + return result; + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] User sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "用户同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult syncOrgs(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.upsertOrgEntities(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] Org sync failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "组织同步失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildHasFieldRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeHasFieldRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] HAS_FIELD relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "HAS_FIELD 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildDerivedFromRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeDerivedFromRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] DERIVED_FROM relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "DERIVED_FROM 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + public SyncResult buildBelongsToRelations(String graphId) { + validateGraphId(graphId); + String syncId = UUID.randomUUID().toString().substring(0, 8); + ReentrantLock lock = acquireLock(graphId, syncId); + try { + return stepService.mergeBelongsToRelations(graphId, syncId); + } catch (BusinessException e) { + throw e; + } catch (Exception e) { + log.error("[{}] BELONGS_TO relation build failed for graphId={}", syncId, graphId, e); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "BELONGS_TO 关系构建失败,syncId=" + syncId); + } finally { + releaseLock(graphId, lock); + } + } + + // ----------------------------------------------------------------------- + // 内部方法 + // ----------------------------------------------------------------------- + + private ReentrantLock acquireLock(String graphId, String syncId) { + ReentrantLock lock = graphLocks.computeIfAbsent(graphId, k -> new ReentrantLock()); + if (!lock.tryLock()) { + log.warn("[{}] Graph {} is already being synced, rejecting concurrent request", syncId, graphId); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "该图谱正在同步中,请稍后重试"); + } + return lock; + } + + /** + * 释放锁并在无竞争时清理锁对象,防止 graphLocks 无限增长。 + */ + private void releaseLock(String graphId, ReentrantLock lock) { + lock.unlock(); + graphLocks.compute(graphId, (key, existing) -> { + // 仅当锁空闲且无等待线程时移除,compute 保证此 key 的原子性 + if (existing != null && !existing.isLocked() && !existing.hasQueuedThreads()) { + return null; + } + return existing; + }); + } + + private List fetchDatasetsWithRetry(String syncId) { + int maxRetries = properties.getSync().getMaxRetries(); + long retryInterval = properties.getSync().getRetryInterval(); + Exception lastException = null; + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + return dataManagementClient.listAllDatasets(); + } catch (Exception e) { + lastException = e; + log.warn("[{}] Dataset fetch attempt {}/{} failed: {}", syncId, attempt, maxRetries, e.getMessage()); + if (attempt < maxRetries) { + try { + Thread.sleep(retryInterval * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "同步被中断"); + } + } + } + } + log.error("[{}] All {} fetch attempts failed", syncId, maxRetries, lastException); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, + "拉取数据集失败(已重试 " + maxRetries + " 次),syncId=" + syncId); + } + + private static Set extractUsernames(List datasets) { + Set usernames = new LinkedHashSet<>(); + for (DatasetDTO dto : datasets) { + if (dto.getCreatedBy() != null && !dto.getCreatedBy().isBlank()) { + usernames.add(dto.getCreatedBy()); + } + if (dto.getUpdatedBy() != null && !dto.getUpdatedBy().isBlank()) { + usernames.add(dto.getUpdatedBy()); + } + } + return usernames; + } + + private void validateGraphId(String graphId) { + if (graphId == null || !UUID_PATTERN.matcher(graphId).matches()) { + throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER, "graphId 格式无效"); + } + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java new file mode 100644 index 0000000..1619981 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java @@ -0,0 +1,407 @@ +package com.datamate.knowledgegraph.application; + +import com.datamate.knowledgegraph.domain.model.GraphEntity; +import com.datamate.knowledgegraph.domain.model.SyncResult; +import com.datamate.knowledgegraph.domain.repository.GraphEntityRepository; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.DatasetDTO; +import com.datamate.knowledgegraph.infrastructure.client.DataManagementClient.TagDTO; +import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.neo4j.core.Neo4jClient; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.*; + +/** + * 同步步骤执行器(事务边界)。 + *

+ * 所有写操作在独立 {@code @Transactional} 方法中执行, + * 由 {@link GraphSyncService} 编排调用,避免自调用导致事务失效。 + *

+ * 关系构建使用 Cypher MERGE 保证幂等性, + * 实体 upsert 使用 Cypher MERGE 基于 (graph_id, source_id, type) 复合约束原子操作, + * 扩展属性通过 SDN 更新。 + */ +@Service +@Slf4j +@RequiredArgsConstructor +public class GraphSyncStepService { + + private static final String SOURCE_TYPE_SYNC = "SYNC"; + private static final String REL_TYPE = "RELATED_TO"; + + private final GraphEntityRepository entityRepository; + private final Neo4jClient neo4jClient; + private final KnowledgeGraphProperties properties; + + // ----------------------------------------------------------------------- + // 实体 upsert + // ----------------------------------------------------------------------- + + @Transactional + public SyncResult upsertDatasetEntities(String graphId, List datasets, String syncId) { + SyncResult result = beginResult("Dataset", syncId); + int batchSize = properties.getImportBatchSize(); + + for (int i = 0; i < datasets.size(); i++) { + DatasetDTO dto = datasets.get(i); + try { + Map props = new HashMap<>(); + props.put("dataset_type", dto.getDatasetType()); + props.put("status", dto.getStatus()); + props.put("total_size", dto.getTotalSize()); + props.put("file_count", dto.getFileCount()); + if (dto.getParentDatasetId() != null) { + props.put("parent_dataset_id", dto.getParentDatasetId()); + } + if (dto.getTags() != null) { + List tagNames = dto.getTags().stream() + .map(TagDTO::getName).toList(); + props.put("tags", tagNames); + } + + upsertEntity(graphId, dto.getId(), "Dataset", + dto.getName(), dto.getDescription(), props, result); + + if ((i + 1) % batchSize == 0) { + log.debug("[{}] Processed {}/{} datasets", syncId, i + 1, datasets.size()); + } + } catch (Exception e) { + log.warn("[{}] Failed to upsert dataset: sourceId={}", syncId, dto.getId(), e); + result.addError("dataset:" + dto.getId()); + } + } + return endResult(result); + } + + @Transactional + public SyncResult upsertFieldEntities(String graphId, List datasets, String syncId) { + SyncResult result = beginResult("Field", syncId); + + for (DatasetDTO dto : datasets) { + if (dto.getTags() == null || dto.getTags().isEmpty()) { + continue; + } + for (TagDTO tag : dto.getTags()) { + try { + String fieldSourceId = dto.getId() + ":tag:" + tag.getName(); + Map props = new HashMap<>(); + props.put("data_type", "TAG"); + props.put("dataset_source_id", dto.getId()); + if (tag.getColor() != null) { + props.put("color", tag.getColor()); + } + + upsertEntity(graphId, fieldSourceId, "Field", tag.getName(), + "数据集[" + dto.getName() + "]的标签字段", props, result); + } catch (Exception e) { + log.warn("[{}] Failed to upsert field: dataset={}, tag={}", + syncId, dto.getId(), tag.getName(), e); + result.addError("field:" + dto.getId() + ":" + tag.getName()); + } + } + } + return endResult(result); + } + + @Transactional + public SyncResult upsertUserEntities(String graphId, Set usernames, String syncId) { + SyncResult result = beginResult("User", syncId); + + for (String username : usernames) { + try { + Map props = new HashMap<>(); + props.put("username", username); + upsertEntity(graphId, "user:" + username, "User", username, null, props, result); + } catch (Exception e) { + log.warn("[{}] Failed to upsert user: username={}", syncId, username, e); + result.addError("user:" + username); + } + } + return endResult(result); + } + + @Transactional + public SyncResult upsertOrgEntities(String graphId, String syncId) { + SyncResult result = beginResult("Org", syncId); + + try { + Map props = new HashMap<>(); + props.put("org_code", "DEFAULT"); + props.put("level", 1); + upsertEntity(graphId, "org:default", "Org", "默认组织", + "系统默认组织(待对接组织服务后更新)", props, result); + } catch (Exception e) { + log.warn("[{}] Failed to upsert default org", syncId, e); + result.addError("org:default"); + } + return endResult(result); + } + + // ----------------------------------------------------------------------- + // 全量对账删除 + // ----------------------------------------------------------------------- + + /** + * 删除 Neo4j 中 source_type=SYNC 但 source_id 不在活跃集合中的实体。 + * 使用 DETACH DELETE 同时清理关联关系。 + *

+ * 当 activeSourceIds 为空时,说明数据源中该类型实体已全部移除, + * 将清除图中所有对应的 SYNC 实体。调用方需确保空集是数据拉取成功后的 + * 真实结果(fetchDatasetsWithRetry 失败时会抛异常,不会到达此处)。 + */ + @Transactional + public int purgeStaleEntities(String graphId, String type, Set activeSourceIds, String syncId) { + String cypher; + Map params; + + if (activeSourceIds.isEmpty()) { + log.warn("[{}] Active source IDs empty for type={}, purging ALL SYNC entities of this type", + syncId, type); + cypher = "MATCH (e:Entity {graph_id: $graphId, type: $type, source_type: 'SYNC'}) " + + "DETACH DELETE e " + + "RETURN count(*) AS deleted"; + params = Map.of("graphId", graphId, "type", type); + } else { + cypher = "MATCH (e:Entity {graph_id: $graphId, type: $type, source_type: 'SYNC'}) " + + "WHERE NOT e.source_id IN $activeSourceIds " + + "DETACH DELETE e " + + "RETURN count(*) AS deleted"; + params = Map.of( + "graphId", graphId, + "type", type, + "activeSourceIds", new ArrayList<>(activeSourceIds) + ); + } + + long deleted = neo4jClient.query(cypher) + .bindAll(params) + .fetchAs(Long.class) + .mappedBy((ts, record) -> record.get("deleted").asLong()) + .one() + .orElse(0L); + + if (deleted > 0) { + log.info("[{}] Purged {} stale {} entities from graphId={}", syncId, deleted, type, graphId); + } + return (int) deleted; + } + + // ----------------------------------------------------------------------- + // 关系构建(MERGE 保证幂等) + // ----------------------------------------------------------------------- + + @Transactional + public SyncResult mergeHasFieldRelations(String graphId, String syncId) { + SyncResult result = beginResult("HAS_FIELD", syncId); + + List fields = entityRepository.findByGraphIdAndType(graphId, "Field"); + + for (GraphEntity field : fields) { + try { + Object datasetSourceId = field.getProperties().get("dataset_source_id"); + if (datasetSourceId == null) { + result.incrementSkipped(); + continue; + } + + Optional datasetOpt = entityRepository.findByGraphIdAndSourceIdAndType( + graphId, datasetSourceId.toString(), "Dataset"); + if (datasetOpt.isEmpty()) { + result.incrementSkipped(); + continue; + } + + boolean created = mergeRelation(graphId, datasetOpt.get().getId(), field.getId(), + "HAS_FIELD", "{}", syncId); + if (created) { + result.incrementCreated(); + } else { + result.incrementSkipped(); + } + } catch (Exception e) { + log.warn("[{}] Failed to merge HAS_FIELD for field: id={}", syncId, field.getId(), e); + result.addError("has_field:" + field.getId()); + } + } + return endResult(result); + } + + @Transactional + public SyncResult mergeDerivedFromRelations(String graphId, String syncId) { + SyncResult result = beginResult("DERIVED_FROM", syncId); + + List datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); + + for (GraphEntity dataset : datasets) { + try { + Object parentId = dataset.getProperties().get("parent_dataset_id"); + if (parentId == null || parentId.toString().isBlank()) { + continue; + } + + Optional parentOpt = entityRepository.findByGraphIdAndSourceIdAndType( + graphId, parentId.toString(), "Dataset"); + if (parentOpt.isEmpty()) { + result.incrementSkipped(); + continue; + } + + boolean created = mergeRelation(graphId, dataset.getId(), parentOpt.get().getId(), + "DERIVED_FROM", "{\"derivation_type\":\"VERSION\"}", syncId); + if (created) { + result.incrementCreated(); + } else { + result.incrementSkipped(); + } + } catch (Exception e) { + log.warn("[{}] Failed to merge DERIVED_FROM for dataset: id={}", syncId, dataset.getId(), e); + result.addError("derived_from:" + dataset.getId()); + } + } + return endResult(result); + } + + @Transactional + public SyncResult mergeBelongsToRelations(String graphId, String syncId) { + SyncResult result = beginResult("BELONGS_TO", syncId); + + Optional defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType( + graphId, "org:default", "Org"); + if (defaultOrgOpt.isEmpty()) { + log.warn("[{}] Default org not found, skipping BELONGS_TO", syncId); + result.addError("belongs_to:org_missing"); + return endResult(result); + } + String orgId = defaultOrgOpt.get().getId(); + + // User → Org + for (GraphEntity user : entityRepository.findByGraphIdAndType(graphId, "User")) { + try { + boolean created = mergeRelation(graphId, user.getId(), orgId, + "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } catch (Exception e) { + log.warn("[{}] Failed to merge BELONGS_TO for user: id={}", syncId, user.getId(), e); + result.addError("belongs_to:user:" + user.getId()); + } + } + + // Dataset → Org + for (GraphEntity dataset : entityRepository.findByGraphIdAndType(graphId, "Dataset")) { + try { + boolean created = mergeRelation(graphId, dataset.getId(), orgId, + "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); + if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } + } catch (Exception e) { + log.warn("[{}] Failed to merge BELONGS_TO for dataset: id={}", syncId, dataset.getId(), e); + result.addError("belongs_to:dataset:" + dataset.getId()); + } + } + return endResult(result); + } + + // ----------------------------------------------------------------------- + // 内部方法 + // ----------------------------------------------------------------------- + + /** + * 使用 Cypher MERGE 原子创建或匹配实体,再通过 SDN 更新扩展属性。 + *

+ * MERGE 基于 (graph_id, source_id, type) 复合唯一约束, + * 保证多实例并发写入时不会产生重复节点。 + */ + private void upsertEntity(String graphId, String sourceId, String type, + String name, String description, + Map props, SyncResult result) { + String newId = UUID.randomUUID().toString(); + + Map params = new HashMap<>(); + params.put("graphId", graphId); + params.put("sourceId", sourceId); + params.put("type", type); + params.put("newId", newId); + params.put("name", name != null ? name : ""); + params.put("description", description != null ? description : ""); + + // Phase 1: Atomic MERGE — 创建或匹配实体骨架 + Boolean isNew = neo4jClient.query( + "MERGE (e:Entity {graph_id: $graphId, source_id: $sourceId, type: $type}) " + + "ON CREATE SET e.id = $newId, e.source_type = 'SYNC', e.confidence = 1.0, " + + " e.name = $name, e.description = $description, " + + " e.created_at = datetime(), e.updated_at = datetime() " + + "ON MATCH SET e.name = $name, e.description = $description, " + + " e.updated_at = datetime() " + + "RETURN e.id = $newId AS isNew" + ) + .bindAll(params) + .fetchAs(Boolean.class) + .mappedBy((ts, record) -> record.get("isNew").asBoolean()) + .one() + .orElse(false); + + // Phase 2: 通过 SDN 更新扩展属性(Map 序列化由 SDN 处理) + entityRepository.findByGraphIdAndSourceIdAndType(graphId, sourceId, type) + .ifPresent(entity -> { + entity.setProperties(props != null ? props : new HashMap<>()); + entityRepository.save(entity); + }); + + if (isNew) { + result.incrementCreated(); + } else { + result.incrementUpdated(); + } + } + + /** + * 使用 Cypher MERGE 创建或匹配关系,保证幂等性。 + * + * @return true 如果是新创建的关系,false 如果已存在 + */ + private boolean mergeRelation(String graphId, String sourceEntityId, String targetEntityId, + String relationType, String propertiesJson, String syncId) { + String newId = UUID.randomUUID().toString(); + + String mergedId = neo4jClient + .query( + "MATCH (s:Entity {graph_id: $graphId, id: $sourceEntityId}) " + + "MATCH (t:Entity {graph_id: $graphId, id: $targetEntityId}) " + + "MERGE (s)-[r:" + REL_TYPE + " {graph_id: $graphId, relation_type: $relationType}]->(t) " + + "ON CREATE SET r.id = $newId, r.weight = 1.0, r.confidence = 1.0, " + + " r.source_id = '', r.properties_json = $propertiesJson, r.created_at = datetime() " + + "RETURN r.id AS relId" + ) + .bindAll(Map.of( + "graphId", graphId, + "sourceEntityId", sourceEntityId, + "targetEntityId", targetEntityId, + "relationType", relationType, + "newId", newId, + "propertiesJson", propertiesJson + )) + .fetchAs(String.class) + .mappedBy((ts, record) -> record.get("relId").asString()) + .one() + .orElse(null); + + return newId.equals(mergedId); + } + + private SyncResult beginResult(String syncType, String syncId) { + return SyncResult.builder() + .syncType(syncType) + .syncId(syncId) + .startedAt(LocalDateTime.now()) + .errors(new ArrayList<>()) + .build(); + } + + private SyncResult endResult(SyncResult result) { + result.setCompletedAt(LocalDateTime.now()); + return result; + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncResult.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncResult.java new file mode 100644 index 0000000..9d8dcd2 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/model/SyncResult.java @@ -0,0 +1,77 @@ +package com.datamate.knowledgegraph.domain.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +/** + * 同步操作结果统计。 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SyncResult { + + /** 本次同步的追踪标识 */ + private String syncId; + + /** 同步的实体/关系类型 */ + private String syncType; + + @Builder.Default + private int created = 0; + + @Builder.Default + private int updated = 0; + + @Builder.Default + private int skipped = 0; + + @Builder.Default + private int failed = 0; + + /** 全量对账删除的过期实体数 */ + @Builder.Default + private int purged = 0; + + @Builder.Default + private List errors = new ArrayList<>(); + + private LocalDateTime startedAt; + + private LocalDateTime completedAt; + + public int total() { + return created + updated + skipped + failed; + } + + public long durationMillis() { + if (startedAt == null || completedAt == null) { + return 0; + } + return java.time.Duration.between(startedAt, completedAt).toMillis(); + } + + public void incrementCreated() { + created++; + } + + public void incrementUpdated() { + updated++; + } + + public void incrementSkipped() { + skipped++; + } + + public void addError(String error) { + failed++; + errors.add(error); + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphEntityRepository.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphEntityRepository.java index ae6a677..6e326d3 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphEntityRepository.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/domain/repository/GraphEntityRepository.java @@ -41,4 +41,12 @@ public interface GraphEntityRepository extends Neo4jRepository findByGraphIdAndSourceIdAndType( + @Param("graphId") String graphId, + @Param("sourceId") String sourceId, + @Param("type") String type); } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java new file mode 100644 index 0000000..c80a122 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java @@ -0,0 +1,126 @@ +package com.datamate.knowledgegraph.infrastructure.client; + +import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +/** + * 数据管理服务 REST 客户端。 + *

+ * 通过 HTTP 调用 data-management-service 的 REST API, + * 拉取数据集、文件等元数据用于同步到 Neo4j。 + */ +@Component +@Slf4j +public class DataManagementClient { + + private final RestTemplate restTemplate; + private final String baseUrl; + private final int pageSize; + + public DataManagementClient( + @Qualifier("kgRestTemplate") RestTemplate restTemplate, + KnowledgeGraphProperties properties) { + this.restTemplate = restTemplate; + this.baseUrl = properties.getSync().getDataManagementUrl(); + this.pageSize = properties.getSync().getPageSize(); + } + + /** + * 拉取所有数据集(自动分页)。 + */ + public List listAllDatasets() { + List allDatasets = new ArrayList<>(); + int page = 0; + + while (true) { + String url = baseUrl + "/data-management/datasets?page=" + page + "&size=" + pageSize; + log.debug("Fetching datasets: page={}, size={}", page, pageSize); + + try { + ResponseEntity> response = restTemplate.exchange( + url, HttpMethod.GET, null, + new ParameterizedTypeReference<>() {} + ); + + PagedResult body = response.getBody(); + if (body == null || body.getContent() == null || body.getContent().isEmpty()) { + break; + } + + allDatasets.addAll(body.getContent()); + log.debug("Fetched {} datasets (page {}), total so far: {}", + body.getContent().size(), page, allDatasets.size()); + + if (page >= body.getTotalPages() - 1) { + break; + } + page++; + } catch (RestClientException e) { + log.error("Failed to fetch datasets from data-management-service: page={}, url={}", page, url, e); + throw e; + } + } + + log.info("Fetched {} datasets in total from data-management-service", allDatasets.size()); + return allDatasets; + } + + // ----------------------------------------------------------------------- + // 响应 DTO(仅包含同步所需字段) + // ----------------------------------------------------------------------- + + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class PagedResult { + private List content; + private long page; + private long totalElements; + private long totalPages; + } + + /** + * 与 data-management-service 的 DatasetResponse 对齐。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class DatasetDTO { + private String id; + private String name; + private String description; + private String parentDatasetId; + private String datasetType; + private String status; + private Long totalSize; + private Integer fileCount; + private String createdBy; + private String updatedBy; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; + private List tags; + } + + /** + * 与 data-management-service 的 TagResponse 对齐。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class TagDTO { + private String id; + private String name; + private String color; + private String description; + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java index 5983536..ec76994 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java @@ -18,7 +18,8 @@ public enum KnowledgeGraphErrorCode implements ErrorCode { INVALID_RELATION("knowledge_graph.0005", "无效的关系定义"), IMPORT_FAILED("knowledge_graph.0006", "图谱导入失败"), QUERY_DEPTH_EXCEEDED("knowledge_graph.0007", "查询深度超出限制"), - MAX_NODES_EXCEEDED("knowledge_graph.0008", "查询结果节点数超出限制"); + MAX_NODES_EXCEEDED("knowledge_graph.0008", "查询结果节点数超出限制"), + SYNC_FAILED("knowledge_graph.0009", "数据同步失败"); private final String code; private final String message; diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java new file mode 100644 index 0000000..b3f588f --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java @@ -0,0 +1,90 @@ +package com.datamate.knowledgegraph.infrastructure.neo4j; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.data.neo4j.core.Neo4jClient; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 图谱 Schema 初始化器。 + *

+ * 应用启动时自动创建 Neo4j 索引和约束。 + * 所有语句使用 {@code IF NOT EXISTS},保证幂等性。 + *

+ * 对应 {@code docs/knowledge-graph/schema/schema.cypher} 中的第 1-3 部分。 + */ +@Component +@Slf4j +@RequiredArgsConstructor +@Order(1) +public class GraphInitializer implements ApplicationRunner { + + private final Neo4jClient neo4jClient; + private final KnowledgeGraphProperties properties; + + /** + * 需要在启动时执行的 Cypher 语句。 + * 每条语句必须独立执行(Neo4j 不支持多条 DDL 在同一事务中)。 + */ + private static final List SCHEMA_STATEMENTS = List.of( + // 约束(自动创建对应索引) + "CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE", + + // 同步 upsert 复合唯一约束:防止并发写入产生重复实体 + "CREATE CONSTRAINT entity_sync_unique IF NOT EXISTS " + + "FOR (n:Entity) REQUIRE (n.graph_id, n.source_id, n.type) IS UNIQUE", + + // 单字段索引 + "CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id)", + "CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)", + "CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)", + "CREATE INDEX entity_source_id IF NOT EXISTS FOR (n:Entity) ON (n.source_id)", + "CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at)", + + // 复合索引 + "CREATE INDEX entity_graph_id_type IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.type)", + "CREATE INDEX entity_graph_id_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.id)", + "CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)", + + // 全文索引 + "CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]" + ); + + @Override + public void run(ApplicationArguments args) { + if (!properties.getSync().isAutoInitSchema()) { + log.info("Schema auto-init is disabled, skipping"); + return; + } + + log.info("Initializing Neo4j schema: {} statements to execute", SCHEMA_STATEMENTS.size()); + + int succeeded = 0; + int failed = 0; + + for (String statement : SCHEMA_STATEMENTS) { + try { + neo4jClient.query(statement).run(); + succeeded++; + log.debug("Schema statement executed: {}", truncate(statement)); + } catch (Exception e) { + failed++; + // 约束/索引可能已存在(不同于 IF NOT EXISTS 的实现版本), + // 记录警告但不中断启动。 + log.warn("Schema statement failed (may already exist): {} — {}", + truncate(statement), e.getMessage()); + } + } + + log.info("Neo4j schema initialization completed: succeeded={}, failed={}", succeeded, failed); + } + + private static String truncate(String s) { + return s.length() <= 100 ? s : s.substring(0, 97) + "..."; + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java index 65612fb..48a3c90 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java @@ -17,4 +17,32 @@ public class KnowledgeGraphProperties { /** 批量导入批次大小 */ private int importBatchSize = 100; + + /** 同步相关配置 */ + private Sync sync = new Sync(); + + @Data + public static class Sync { + + /** 数据管理服务基础 URL */ + private String dataManagementUrl = "http://localhost:8080"; + + /** 同步每页拉取数量 */ + private int pageSize = 200; + + /** HTTP 连接超时(毫秒) */ + private int connectTimeout = 5000; + + /** HTTP 读取超时(毫秒) */ + private int readTimeout = 30000; + + /** 失败时最大重试次数 */ + private int maxRetries = 3; + + /** 重试间隔(毫秒) */ + private long retryInterval = 1000; + + /** 是否在启动时自动初始化 Schema */ + private boolean autoInitSchema = true; + } } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncResultVO.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncResultVO.java new file mode 100644 index 0000000..b5c951b --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/dto/SyncResultVO.java @@ -0,0 +1,53 @@ +package com.datamate.knowledgegraph.interfaces.dto; + +import com.datamate.knowledgegraph.domain.model.SyncResult; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * 同步结果视图对象。 + *

+ * 不暴露内部错误详情(errors 列表),仅返回错误计数和 syncId, + * 前端可通过 syncId 向运维查询具体日志。 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SyncResultVO { + + private String syncId; + private String syncType; + private int created; + private int updated; + private int skipped; + private int failed; + private int purged; + private int total; + private long durationMillis; + /** 错误数量(不暴露具体错误信息) */ + private int errorCount; + private LocalDateTime startedAt; + private LocalDateTime completedAt; + + public static SyncResultVO from(SyncResult result) { + return SyncResultVO.builder() + .syncId(result.getSyncId()) + .syncType(result.getSyncType()) + .created(result.getCreated()) + .updated(result.getUpdated()) + .skipped(result.getSkipped()) + .failed(result.getFailed()) + .purged(result.getPurged()) + .total(result.total()) + .durationMillis(result.durationMillis()) + .errorCount(result.getErrors() != null ? result.getErrors().size() : 0) + .startedAt(result.getStartedAt()) + .completedAt(result.getCompletedAt()) + .build(); + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java new file mode 100644 index 0000000..8b788e3 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/interfaces/rest/GraphSyncController.java @@ -0,0 +1,102 @@ +package com.datamate.knowledgegraph.interfaces.rest; + +import com.datamate.knowledgegraph.application.GraphSyncService; +import com.datamate.knowledgegraph.domain.model.SyncResult; +import com.datamate.knowledgegraph.interfaces.dto.SyncResultVO; +import jakarta.validation.constraints.Pattern; +import lombok.RequiredArgsConstructor; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * 知识图谱数据同步 API。 + *

+ * 提供手动触发 MySQL → Neo4j 同步的 REST 端点。 + * 生产环境中也可通过定时任务自动触发。 + */ +@RestController +@RequestMapping("/knowledge-graph/{graphId}/sync") +@RequiredArgsConstructor +@Validated +public class GraphSyncController { + + private static final String UUID_REGEX = + "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"; + + private final GraphSyncService syncService; + + /** + * 全量同步:拉取所有实体并构建关系。 + */ + @PostMapping("/full") + public List syncAll( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + List results = syncService.syncAll(graphId); + return results.stream().map(SyncResultVO::from).toList(); + } + + /** + * 同步数据集实体。 + */ + @PostMapping("/datasets") + public SyncResultVO syncDatasets( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncDatasets(graphId)); + } + + /** + * 同步字段实体。 + */ + @PostMapping("/fields") + public SyncResultVO syncFields( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncFields(graphId)); + } + + /** + * 同步用户实体。 + */ + @PostMapping("/users") + public SyncResultVO syncUsers( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncUsers(graphId)); + } + + /** + * 同步组织实体。 + */ + @PostMapping("/orgs") + public SyncResultVO syncOrgs( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.syncOrgs(graphId)); + } + + /** + * 构建 HAS_FIELD 关系。 + */ + @PostMapping("/relations/has-field") + public SyncResultVO buildHasFieldRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildHasFieldRelations(graphId)); + } + + /** + * 构建 DERIVED_FROM 关系。 + */ + @PostMapping("/relations/derived-from") + public SyncResultVO buildDerivedFromRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildDerivedFromRelations(graphId)); + } + + /** + * 构建 BELONGS_TO 关系。 + */ + @PostMapping("/relations/belongs-to") + public SyncResultVO buildBelongsToRelations( + @PathVariable @Pattern(regexp = UUID_REGEX, message = "graphId 格式无效") String graphId) { + return SyncResultVO.from(syncService.buildBelongsToRelations(graphId)); + } +} diff --git a/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml b/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml index 20dc261..7a59563 100644 --- a/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml +++ b/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml @@ -23,3 +23,19 @@ datamate: max-nodes-per-query: ${KG_MAX_NODES:500} # 批量导入批次大小 import-batch-size: ${KG_IMPORT_BATCH_SIZE:100} + # MySQL → Neo4j 同步配置 + sync: + # 数据管理服务地址 + data-management-url: ${DATA_MANAGEMENT_URL:http://localhost:8080} + # 每页拉取数量 + page-size: ${KG_SYNC_PAGE_SIZE:200} + # HTTP 连接超时(毫秒) + connect-timeout: ${KG_SYNC_CONNECT_TIMEOUT:5000} + # HTTP 读取超时(毫秒) + read-timeout: ${KG_SYNC_READ_TIMEOUT:30000} + # 失败时最大重试次数 + max-retries: ${KG_SYNC_MAX_RETRIES:3} + # 重试间隔(毫秒) + retry-interval: ${KG_SYNC_RETRY_INTERVAL:1000} + # 是否在启动时自动初始化 Schema + auto-init-schema: ${KG_AUTO_INIT_SCHEMA:true}