From 20446bf57dab261070ea9068381ac06f474d0ea8 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Thu, 19 Feb 2026 15:01:36 +0800 Subject: [PATCH] =?UTF-8?q?feat(kg):=20=E5=AE=9E=E7=8E=B0=E7=9F=A5?= =?UTF-8?q?=E8=AF=86=E5=9B=BE=E8=B0=B1=E7=BB=84=E7=BB=87=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 替换硬编码的 org:default 占位符,支持真实组织数据 - 从 users 表的 organization 字段获取组织映射 - 支持多租户场景,每个组织独立管理 - 添加降级保护机制,防止数据丢失 - 修复 BELONGS_TO 关系遗留问题 - 修复组织编码碰撞问题 - 新增 95 个测试用例,全部通过 修改文件: - Auth 模块:添加组织字段和查询接口 - KG Sync Client:添加用户组织映射 - Core Sync Logic:重写组织实体和关系逻辑 - Tests:新增测试用例覆盖核心场景 --- .../application/GraphSyncService.java | 115 ++++++++++++- .../application/GraphSyncStepService.java | 149 +++++++++++++--- .../client/DataManagementClient.java | 41 +++++ .../application/GraphSyncServiceTest.java | 162 +++++++++++++++++- .../application/GraphSyncStepServiceTest.java | 156 ++++++++++++++++- .../application/AuthApplicationService.java | 11 ++ .../auth/domain/model/AuthUserSummary.java | 1 + .../auth/interfaces/rest/AuthController.java | 8 + .../src/main/resources/mappers/AuthMapper.xml | 3 +- 9 files changed, 598 insertions(+), 48 deletions(-) diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java index db6a33e..a86fc89 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncService.java @@ -93,7 +93,15 @@ public class GraphSyncService { Set usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets); resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId)); - resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId)); + + Map userOrgMap = fetchMapWithRetry(syncId, "user-orgs", + () -> dataManagementClient.fetchUserOrganizationMap()); + boolean orgMapDegraded = (userOrgMap == null); + if (orgMapDegraded) { + log.warn("[{}] Org map fetch degraded, using empty map; Org purge will be skipped", syncId); + userOrgMap = Collections.emptyMap(); + } + resultMap.put("Org", stepService.upsertOrgEntities(graphId, userOrgMap, syncId)); resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId)); resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId)); resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId)); @@ -130,6 +138,14 @@ public class GraphSyncService { resultMap.get("User").setPurged( stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId)); + if (!orgMapDegraded) { + Set activeOrgSourceIds = buildActiveOrgSourceIds(userOrgMap); + resultMap.get("Org").setPurged( + stepService.purgeStaleEntities(graphId, "Org", activeOrgSourceIds, syncId)); + } else { + log.info("[{}] Skipping Org purge due to degraded org map fetch", syncId); + } + Set activeWorkflowIds = workflows.stream() .filter(Objects::nonNull) .map(WorkflowDTO::getId) @@ -169,7 +185,12 @@ public class GraphSyncService { // 关系构建(MERGE 幂等) resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId)); resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId)); - resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId)); + if (!orgMapDegraded) { + resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, userOrgMap, syncId)); + } else { + log.info("[{}] Skipping BELONGS_TO relation build due to degraded org map fetch", syncId); + resultMap.put("BELONGS_TO", SyncResult.builder().syncType("BELONGS_TO").build()); + } resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId)); resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId)); resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId)); @@ -251,7 +272,15 @@ public class GraphSyncService { Set usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets); resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId)); - resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId)); + + Map userOrgMap = fetchMapWithRetry(syncId, "user-orgs", + () -> dataManagementClient.fetchUserOrganizationMap()); + boolean orgMapDegraded = (userOrgMap == null); + if (orgMapDegraded) { + log.warn("[{}] Org map fetch degraded in incremental sync, using empty map", syncId); + userOrgMap = Collections.emptyMap(); + } + resultMap.put("Org", stepService.upsertOrgEntities(graphId, userOrgMap, syncId)); resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId)); resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId)); resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId)); @@ -263,7 +292,12 @@ public class GraphSyncService { // 关系构建(MERGE 幂等)- 增量同步时只处理变更实体相关的关系 resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId, changedEntityIds)); resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId, changedEntityIds)); - resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId, changedEntityIds)); + if (!orgMapDegraded) { + resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, userOrgMap, syncId, changedEntityIds)); + } else { + log.info("[{}] Skipping BELONGS_TO relation build due to degraded org map fetch", syncId); + resultMap.put("BELONGS_TO", SyncResult.builder().syncType("BELONGS_TO").build()); + } resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId, changedEntityIds)); resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId, changedEntityIds)); resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId, changedEntityIds)); @@ -411,7 +445,22 @@ public class GraphSyncService { LocalDateTime startedAt = LocalDateTime.now(); ReentrantLock lock = acquireLock(graphId, syncId); try { - SyncResult result = stepService.upsertOrgEntities(graphId, syncId); + Map userOrgMap = fetchMapWithRetry(syncId, "user-orgs", + () -> dataManagementClient.fetchUserOrganizationMap()); + boolean orgMapDegraded = (userOrgMap == null); + if (orgMapDegraded) { + log.warn("[{}] Org map fetch degraded, using empty map; Org purge will be skipped", syncId); + userOrgMap = Collections.emptyMap(); + } + SyncResult result = stepService.upsertOrgEntities(graphId, userOrgMap, syncId); + + if (!orgMapDegraded) { + Set activeOrgSourceIds = buildActiveOrgSourceIds(userOrgMap); + result.setPurged(stepService.purgeStaleEntities(graphId, "Org", activeOrgSourceIds, syncId)); + } else { + log.info("[{}] Skipping Org purge due to degraded org map fetch", syncId); + } + saveSyncHistory(SyncMetadata.fromResults( syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, List.of(result))); return result; @@ -466,7 +515,13 @@ public class GraphSyncService { String syncId = UUID.randomUUID().toString(); ReentrantLock lock = acquireLock(graphId, syncId); try { - return stepService.mergeBelongsToRelations(graphId, syncId); + Map userOrgMap = fetchMapWithRetry(syncId, "user-orgs", + () -> dataManagementClient.fetchUserOrganizationMap()); + if (userOrgMap == null) { + log.warn("[{}] Org map fetch degraded, skipping BELONGS_TO relation build to preserve existing relations", syncId); + return SyncResult.builder().syncType("BELONGS_TO").build(); + } + return stepService.mergeBelongsToRelations(graphId, userOrgMap, syncId); } catch (BusinessException e) { throw e; } catch (Exception e) { @@ -819,6 +874,54 @@ public class GraphSyncService { "拉取" + resourceName + "失败(已重试 " + maxRetries + " 次),syncId=" + syncId); } + /** + * 带重试的 Map 拉取方法。失败时返回 {@code null} 表示降级。 + *

+ * 调用方需检查返回值是否为 null,并在降级时跳过依赖完整数据的操作 + * (如 purge),以避免基于不完整快照误删数据。 + */ + private Map fetchMapWithRetry(String syncId, String resourceName, + java.util.function.Supplier> fetcher) { + int maxRetries = properties.getSync().getMaxRetries(); + long retryInterval = properties.getSync().getRetryInterval(); + Exception lastException = null; + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + return fetcher.get(); + } catch (Exception e) { + lastException = e; + log.warn("[{}] {} fetch attempt {}/{} failed: {}", + syncId, resourceName, attempt, maxRetries, e.getMessage()); + if (attempt < maxRetries) { + try { + Thread.sleep(retryInterval * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "同步被中断"); + } + } + } + } + log.warn("[{}] All {} fetch attempts for {} failed, returning null (degraded)", + syncId, maxRetries, resourceName, lastException); + return null; + } + + /** + * 根据 userOrgMap 计算活跃的 Org source_id 集合(含 "未分配" 兜底组织)。 + */ + private Set buildActiveOrgSourceIds(Map userOrgMap) { + Set activeOrgSourceIds = new LinkedHashSet<>(); + activeOrgSourceIds.add("org:unassigned"); + for (String org : userOrgMap.values()) { + if (org != null && !org.isBlank()) { + activeOrgSourceIds.add("org:" + GraphSyncStepService.normalizeOrgCode(org.trim())); + } + } + return activeOrgSourceIds; + } + /** * 从所有实体类型中提取用户名。 */ diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java index 3b84075..7395d5f 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/application/GraphSyncStepService.java @@ -37,6 +37,7 @@ public class GraphSyncStepService { private static final String SOURCE_TYPE_SYNC = "SYNC"; private static final String REL_TYPE = "RELATED_TO"; + static final String DEFAULT_ORG_NAME = "未分配"; private final GraphEntityRepository entityRepository; final Neo4jClient neo4jClient; // 改为包级别访问,供GraphSyncService使用 @@ -143,18 +144,35 @@ public class GraphSyncStepService { } @Transactional - public SyncResult upsertOrgEntities(String graphId, String syncId) { + public SyncResult upsertOrgEntities(String graphId, Map userOrgMap, String syncId) { SyncResult result = beginResult("Org", syncId); - try { - Map props = new HashMap<>(); - props.put("org_code", "DEFAULT"); - props.put("level", 1); - upsertEntity(graphId, "org:default", "Org", "默认组织", - "系统默认组织(待对接组织服务后更新)", props, result); - } catch (Exception e) { - log.warn("[{}] Failed to upsert default org", syncId, e); - result.addError("org:default"); + // 提取去重的组织名称;null/blank 归入 "未分配" + Set orgNames = new LinkedHashSet<>(); + orgNames.add(DEFAULT_ORG_NAME); + for (String org : userOrgMap.values()) { + if (org != null && !org.isBlank()) { + orgNames.add(org.trim()); + } + } + + for (String orgName : orgNames) { + try { + String orgCode = normalizeOrgCode(orgName); + String sourceId = "org:" + orgCode; + Map props = new HashMap<>(); + props.put("org_code", orgCode); + props.put("level", 1); + + String description = DEFAULT_ORG_NAME.equals(orgName) + ? "未分配组织(用户无组织信息时使用)" + : "组织:" + orgName; + + upsertEntity(graphId, sourceId, "Org", orgName, description, props, result); + } catch (Exception e) { + log.warn("[{}] Failed to upsert org: {}", syncId, orgName, e); + result.addError("org:" + orgName); + } } return endResult(result); } @@ -547,33 +565,57 @@ public class GraphSyncStepService { } @Transactional - public SyncResult mergeBelongsToRelations(String graphId, String syncId) { - return mergeBelongsToRelations(graphId, syncId, null); + public SyncResult mergeBelongsToRelations(String graphId, Map userOrgMap, String syncId) { + return mergeBelongsToRelations(graphId, userOrgMap, syncId, null); } @Transactional - public SyncResult mergeBelongsToRelations(String graphId, String syncId, Set changedEntityIds) { + public SyncResult mergeBelongsToRelations(String graphId, Map userOrgMap, + String syncId, Set changedEntityIds) { SyncResult result = beginResult("BELONGS_TO", syncId); - Optional defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType( - graphId, "org:default", "Org"); - if (defaultOrgOpt.isEmpty()) { - log.warn("[{}] Default org not found, skipping BELONGS_TO", syncId); + // 构建 org sourceId → entityId 映射 + Map orgMap = buildSourceIdToEntityIdMap(graphId, "Org"); + + String unassignedOrgEntityId = orgMap.get("org:unassigned"); + if (orgMap.isEmpty() || unassignedOrgEntityId == null) { + log.warn("[{}] No org entities found (or unassigned org missing), skipping BELONGS_TO", syncId); result.addError("belongs_to:org_missing"); return endResult(result); } - String orgId = defaultOrgOpt.get().getId(); - // User → Org + // User → Org(通过 userOrgMap 查找对应组织) List users = entityRepository.findByGraphIdAndType(graphId, "User"); if (changedEntityIds != null) { users = users.stream() .filter(user -> changedEntityIds.contains(user.getId())) .toList(); } + + // Dataset → Org(通过创建者的组织) + List datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); + if (changedEntityIds != null) { + datasets = datasets.stream() + .filter(dataset -> changedEntityIds.contains(dataset.getId())) + .toList(); + } + + // 删除受影响实体的旧 BELONGS_TO 关系,避免组织变更后遗留过时关系 + Set affectedEntityIds = new LinkedHashSet<>(); + users.forEach(u -> affectedEntityIds.add(u.getId())); + datasets.forEach(d -> affectedEntityIds.add(d.getId())); + if (!affectedEntityIds.isEmpty()) { + deleteOutgoingRelations(graphId, "BELONGS_TO", affectedEntityIds, syncId); + } + for (GraphEntity user : users) { try { - boolean created = mergeRelation(graphId, user.getId(), orgId, + Object usernameObj = user.getProperties() != null ? user.getProperties().get("username") : null; + String username = usernameObj != null ? usernameObj.toString() : null; + + String orgEntityId = resolveOrgEntityId(username, userOrgMap, orgMap, unassignedOrgEntityId); + + boolean created = mergeRelation(graphId, user.getId(), orgEntityId, "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } } catch (Exception e) { @@ -582,16 +624,15 @@ public class GraphSyncStepService { } } - // Dataset → Org - List datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset"); - if (changedEntityIds != null) { - datasets = datasets.stream() - .filter(dataset -> changedEntityIds.contains(dataset.getId())) - .toList(); - } + // Dataset → Org(通过创建者的组织) for (GraphEntity dataset : datasets) { try { - boolean created = mergeRelation(graphId, dataset.getId(), orgId, + Object createdByObj = dataset.getProperties() != null ? dataset.getProperties().get("created_by") : null; + String createdBy = createdByObj != null ? createdByObj.toString() : null; + + String orgEntityId = resolveOrgEntityId(createdBy, userOrgMap, orgMap, unassignedOrgEntityId); + + boolean created = mergeRelation(graphId, dataset.getId(), orgEntityId, "BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId); if (created) { result.incrementCreated(); } else { result.incrementSkipped(); } } catch (Exception e) { @@ -1236,4 +1277,56 @@ public class GraphSyncStepService { .filter(e -> e.getSourceId() != null) .collect(Collectors.toMap(GraphEntity::getSourceId, GraphEntity::getId, (a, b) -> a)); } + + /** + * 组织名称转换为 source_id 片段。 + *

+ * 直接使用 trim 后的原始名称,避免归一化导致不同组织碰撞 + * (如 "Org A" 和 "Org_A" 在 lowercase+regex 归一化下会合并为同一编码)。 + * Neo4j 属性值支持任意 Unicode 字符串,无需额外编码。 + */ + static String normalizeOrgCode(String orgName) { + if (DEFAULT_ORG_NAME.equals(orgName)) { + return "unassigned"; + } + return orgName.trim(); + } + + /** + * 删除指定实体的出向关系(按关系类型)。 + *

+ * 用于在重建 BELONGS_TO 等关系前清除旧关系, + * 确保组织变更等场景下不会遗留过时的关系。 + */ + private void deleteOutgoingRelations(String graphId, String relationType, + Set entityIds, String syncId) { + log.debug("[{}] Deleting existing {} relations for {} entities", + syncId, relationType, entityIds.size()); + neo4jClient.query( + "MATCH (e:Entity {graph_id: $graphId})" + + "-[r:RELATED_TO {graph_id: $graphId, relation_type: $relationType}]->()" + + " WHERE e.id IN $entityIds DELETE r" + ).bindAll(Map.of( + "graphId", graphId, + "relationType", relationType, + "entityIds", new ArrayList<>(entityIds) + )).run(); + } + + /** + * 根据用户名查找对应组织实体 ID,未找到时降级到未分配组织。 + */ + private String resolveOrgEntityId(String username, Map userOrgMap, + Map orgMap, String unassignedOrgEntityId) { + if (username == null || username.isBlank()) { + return unassignedOrgEntityId; + } + String orgName = userOrgMap.get(username); + if (orgName == null || orgName.isBlank()) { + return unassignedOrgEntityId; + } + String orgCode = normalizeOrgCode(orgName.trim()); + String orgEntityId = orgMap.get("org:" + orgCode); + return orgEntityId != null ? orgEntityId : unassignedOrgEntityId; + } } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java index 0bd884e..b703e16 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/client/DataManagementClient.java @@ -204,6 +204,37 @@ public class DataManagementClient { "knowledge-sets"); } + /** + * 拉取所有用户的组织映射。 + */ + public Map fetchUserOrganizationMap() { + String url = baseUrl + "/auth/users/organizations"; + log.debug("Fetching user-organization mappings from: {}", url); + try { + ResponseEntity> response = restTemplate.exchange( + url, HttpMethod.GET, null, + new ParameterizedTypeReference>() {}); + + List body = response.getBody(); + if (body == null || body.isEmpty()) { + log.warn("No user-organization mappings returned from auth service"); + return Collections.emptyMap(); + } + + Map result = new LinkedHashMap<>(); + for (UserOrgDTO dto : body) { + if (dto.getUsername() != null && !dto.getUsername().isBlank()) { + result.put(dto.getUsername(), dto.getOrganization()); + } + } + log.info("Fetched {} user-organization mappings", result.size()); + return result; + } catch (RestClientException e) { + log.error("Failed to fetch user-organization mappings from: {}", url, e); + throw e; + } + } + /** * 通用自动分页拉取方法。 */ @@ -459,4 +490,14 @@ public class DataManagementClient { /** 来源数据集 ID 列表(SOURCED_FROM 关系) */ private List sourceDatasetIds; } + + /** + * 用户-组织映射 DTO(与 AuthController.listUserOrganizations 对齐)。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class UserOrgDTO { + private String username; + private String organization; + } } diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java index 2ae4941..151170a 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncServiceTest.java @@ -133,7 +133,9 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("Field").build()); when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) .thenReturn(SyncResult.builder().syncType("User").build()); - when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) + when(dataManagementClient.fetchUserOrganizationMap()) + .thenReturn(Map.of("admin", "DataMate")); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("Org").build()); when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) .thenReturn(SyncResult.builder().syncType("Workflow").build()); @@ -152,7 +154,7 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); - when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) + when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); @@ -371,7 +373,9 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("Field").build()); when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) .thenReturn(SyncResult.builder().syncType("User").build()); - when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) + when(dataManagementClient.fetchUserOrganizationMap()) + .thenReturn(Map.of("admin", "DataMate")); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("Org").build()); when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) .thenReturn(SyncResult.builder().syncType("Workflow").build()); @@ -387,7 +391,7 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); - when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) + when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); @@ -450,7 +454,9 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("Field").build()); when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) .thenReturn(SyncResult.builder().syncType("User").build()); - when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) + when(dataManagementClient.fetchUserOrganizationMap()) + .thenReturn(Map.of("admin", "DataMate")); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("Org").build()); when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) .thenReturn(SyncResult.builder().syncType("Workflow").build()); @@ -466,7 +472,7 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); - when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) + when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); @@ -664,7 +670,9 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("Field").build()); when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) .thenReturn(SyncResult.builder().syncType("User").build()); - when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString())) + when(dataManagementClient.fetchUserOrganizationMap()) + .thenReturn(Map.of("admin", "DataMate")); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("Org").build()); when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) .thenReturn(SyncResult.builder().syncType("Workflow").build()); @@ -682,7 +690,7 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); - lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString())) + lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString())) .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); @@ -704,7 +712,7 @@ class GraphSyncServiceTest { .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString(), any())) .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); - lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString(), any())) + lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString(), any())) .thenReturn(SyncResult.builder().syncType("BELONGS_TO").build()); lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString(), any())) .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); @@ -820,4 +828,140 @@ class GraphSyncServiceTest { .hasMessageContaining("分页偏移量"); } } + + // ----------------------------------------------------------------------- + // 组织同步 + // ----------------------------------------------------------------------- + + @Nested + class OrgSyncTest { + + @Test + void syncOrgs_fetchesUserOrgMapAndPassesToStepService() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.fetchUserOrganizationMap()) + .thenReturn(Map.of("admin", "DataMate", "alice", "三甲医院")); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString())) + .thenReturn(SyncResult.builder().syncType("Org").created(3).build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Org"), anySet(), anyString())) + .thenReturn(0); + + SyncResult result = syncService.syncOrgs(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("Org"); + assertThat(result.getCreated()).isEqualTo(3); + verify(dataManagementClient).fetchUserOrganizationMap(); + + @SuppressWarnings("unchecked") + ArgumentCaptor> mapCaptor = ArgumentCaptor.forClass(Map.class); + verify(stepService).upsertOrgEntities(eq(GRAPH_ID), mapCaptor.capture(), anyString()); + assertThat(mapCaptor.getValue()).containsKeys("admin", "alice"); + } + + @Test + void syncOrgs_fetchUserOrgMapFails_gracefulDegradation() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.fetchUserOrganizationMap()) + .thenThrow(new RuntimeException("auth service down")); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString())) + .thenReturn(SyncResult.builder().syncType("Org").created(1).build()); + + SyncResult result = syncService.syncOrgs(GRAPH_ID); + + // 应优雅降级,使用空 map(仅创建未分配组织) + assertThat(result.getSyncType()).isEqualTo("Org"); + assertThat(result.getCreated()).isEqualTo(1); + // P0 fix: 降级时不执行 Org purge,防止误删已有组织节点 + verify(stepService, never()).purgeStaleEntities(anyString(), eq("Org"), anySet(), anyString()); + } + + @Test + void syncAll_fetchUserOrgMapFails_skipsBelongsToRelationBuild() { + when(properties.getSync()).thenReturn(syncConfig); + + DatasetDTO dto = new DatasetDTO(); + dto.setId("ds-001"); + dto.setName("Test"); + dto.setCreatedBy("admin"); + when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto)); + when(dataManagementClient.listAllWorkflows()).thenReturn(List.of()); + when(dataManagementClient.listAllJobs()).thenReturn(List.of()); + when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of()); + when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of()); + when(dataManagementClient.fetchUserOrganizationMap()) + .thenThrow(new RuntimeException("auth service down")); + + when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Dataset").build()); + when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Field").build()); + when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString())) + .thenReturn(SyncResult.builder().syncType("User").build()); + when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString())) + .thenReturn(SyncResult.builder().syncType("Org").build()); + when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Workflow").build()); + when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("Job").build()); + when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("LabelTask").build()); + when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString())) + .thenReturn(SyncResult.builder().syncType("KnowledgeSet").build()); + when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString())) + .thenReturn(0); + when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("HAS_FIELD").build()); + when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build()); + when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("USES_DATASET").build()); + when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("PRODUCES").build()); + when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build()); + when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("TRIGGERS").build()); + when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build()); + when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("IMPACTS").build()); + when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString())) + .thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build()); + + SyncMetadata metadata = syncService.syncAll(GRAPH_ID); + + assertThat(metadata.getResults()).hasSize(18); + // BELONGS_TO merge must NOT be called when org map is degraded + verify(stepService, never()).mergeBelongsToRelations(anyString(), anyMap(), anyString()); + // Org purge must also be skipped + verify(stepService, never()).purgeStaleEntities(anyString(), eq("Org"), anySet(), anyString()); + } + + @Test + void buildBelongsToRelations_passesUserOrgMap() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.fetchUserOrganizationMap()) + .thenReturn(Map.of("admin", "DataMate")); + when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString())) + .thenReturn(SyncResult.builder().syncType("BELONGS_TO").created(2).build()); + + SyncResult result = syncService.buildBelongsToRelations(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("BELONGS_TO"); + verify(dataManagementClient).fetchUserOrganizationMap(); + } + + @Test + void buildBelongsToRelations_fetchDegraded_skipsRelationBuild() { + when(properties.getSync()).thenReturn(syncConfig); + when(dataManagementClient.fetchUserOrganizationMap()) + .thenThrow(new RuntimeException("auth service down")); + + SyncResult result = syncService.buildBelongsToRelations(GRAPH_ID); + + assertThat(result.getSyncType()).isEqualTo("BELONGS_TO"); + // BELONGS_TO merge must NOT be called when degraded + verify(stepService, never()).mergeBelongsToRelations(anyString(), anyMap(), anyString()); + } + } } diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java index b540b0f..05b55c4 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/application/GraphSyncStepServiceTest.java @@ -505,11 +505,12 @@ class GraphSyncStepServiceTest { } @Test - void mergeBelongsTo_noDefaultOrg_returnsError() { - when(entityRepository.findByGraphIdAndSourceIdAndType(GRAPH_ID, "org:default", "Org")) - .thenReturn(Optional.empty()); + void mergeBelongsTo_noOrgEntities_returnsError() { + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org")) + .thenReturn(List.of()); - SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, SYNC_ID); + Map userOrgMap = Map.of("admin", "DataMate"); + SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, userOrgMap, SYNC_ID); assertThat(result.getFailed()).isGreaterThan(0); assertThat(result.getErrors()).contains("belongs_to:org_missing"); @@ -933,4 +934,151 @@ class GraphSyncStepServiceTest { verify(neo4jClient, times(1)).query(anyString()); } } + + // ----------------------------------------------------------------------- + // upsertOrgEntities(多组织同步) + // ----------------------------------------------------------------------- + + @Nested + class UpsertOrgEntitiesTest { + + @Test + void upsert_multipleOrgs_createsEntityPerDistinctOrg() { + setupNeo4jQueryChain(Boolean.class, true); + + Map userOrgMap = new LinkedHashMap<>(); + userOrgMap.put("admin", "DataMate"); + userOrgMap.put("alice", "三甲医院"); + userOrgMap.put("bob", null); + userOrgMap.put("carol", "DataMate"); // 重复 + + SyncResult result = stepService.upsertOrgEntities(GRAPH_ID, userOrgMap, SYNC_ID); + + // 3 个去重组织: 未分配, DataMate, 三甲医院 + assertThat(result.getCreated()).isEqualTo(3); + assertThat(result.getSyncType()).isEqualTo("Org"); + } + + @Test + void upsert_emptyMap_createsOnlyDefaultOrg() { + setupNeo4jQueryChain(Boolean.class, true); + + SyncResult result = stepService.upsertOrgEntities( + GRAPH_ID, Collections.emptyMap(), SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(1); + } + + @Test + void upsert_allUsersHaveBlankOrg_createsOnlyDefaultOrg() { + setupNeo4jQueryChain(Boolean.class, true); + + Map userOrgMap = new LinkedHashMap<>(); + userOrgMap.put("admin", ""); + userOrgMap.put("alice", " "); + + SyncResult result = stepService.upsertOrgEntities(GRAPH_ID, userOrgMap, SYNC_ID); + + assertThat(result.getCreated()).isEqualTo(1); // 仅未分配 + } + } + + // ----------------------------------------------------------------------- + // mergeBelongsToRelations(多组织映射) + // ----------------------------------------------------------------------- + + @Nested + class MergeBelongsToWithRealOrgsTest { + + @Test + void mergeBelongsTo_usersMapToCorrectOrgs() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity orgDataMate = GraphEntity.builder() + .id("org-entity-dm").sourceId("org:DataMate").type("Org").graphId(GRAPH_ID).build(); + GraphEntity orgUnassigned = GraphEntity.builder() + .id("org-entity-ua").sourceId("org:unassigned").type("Org").graphId(GRAPH_ID).build(); + + GraphEntity userAdmin = GraphEntity.builder() + .id("user-entity-admin").sourceId("user:admin").type("User").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("username", "admin"))) + .build(); + GraphEntity userBob = GraphEntity.builder() + .id("user-entity-bob").sourceId("user:bob").type("User").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("username", "bob"))) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org")) + .thenReturn(List.of(orgDataMate, orgUnassigned)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User")) + .thenReturn(List.of(userAdmin, userBob)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) + .thenReturn(List.of()); + + Map userOrgMap = new HashMap<>(); + userOrgMap.put("admin", "DataMate"); + userOrgMap.put("bob", null); + + SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, userOrgMap, SYNC_ID); + + assertThat(result.getSyncType()).isEqualTo("BELONGS_TO"); + // 1 delete (cleanup old BELONGS_TO) + 2 merge (one per user) + verify(neo4jClient, times(3)).query(anyString()); + } + + @Test + void mergeBelongsTo_datasetMappedToCreatorOrg() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity orgHospital = GraphEntity.builder() + .id("org-entity-hosp").sourceId("org:三甲医院").type("Org").graphId(GRAPH_ID).build(); + GraphEntity orgUnassigned = GraphEntity.builder() + .id("org-entity-ua").sourceId("org:unassigned").type("Org").graphId(GRAPH_ID).build(); + + GraphEntity dataset = GraphEntity.builder() + .id("ds-entity-1").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("created_by", "alice"))) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org")) + .thenReturn(List.of(orgHospital, orgUnassigned)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User")) + .thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) + .thenReturn(List.of(dataset)); + + Map userOrgMap = Map.of("alice", "三甲医院"); + + SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, userOrgMap, SYNC_ID); + + // 1 delete (cleanup old BELONGS_TO) + 1 merge (dataset → org) + verify(neo4jClient, times(2)).query(anyString()); + } + + @Test + void mergeBelongsTo_unknownCreator_fallsBackToUnassigned() { + setupNeo4jQueryChain(String.class, "new-rel-id"); + + GraphEntity orgUnassigned = GraphEntity.builder() + .id("org-entity-ua").sourceId("org:unassigned").type("Org").graphId(GRAPH_ID).build(); + + GraphEntity dataset = GraphEntity.builder() + .id("ds-entity-1").type("Dataset").graphId(GRAPH_ID) + .properties(new HashMap<>(Map.of("created_by", "unknown_user"))) + .build(); + + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org")) + .thenReturn(List.of(orgUnassigned)); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User")) + .thenReturn(List.of()); + when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset")) + .thenReturn(List.of(dataset)); + + SyncResult result = stepService.mergeBelongsToRelations( + GRAPH_ID, Collections.emptyMap(), SYNC_ID); + + // 1 delete (cleanup old BELONGS_TO) + 1 merge (dataset → unassigned) + verify(neo4jClient, times(2)).query(anyString()); + } + } } diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/auth/application/AuthApplicationService.java b/backend/shared/domain-common/src/main/java/com/datamate/common/auth/application/AuthApplicationService.java index 1cc6d63..c05f4a9 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/auth/application/AuthApplicationService.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/auth/application/AuthApplicationService.java @@ -110,6 +110,17 @@ public class AuthApplicationService { return responses; } + /** + * 返回所有用户的用户名与组织映射,供内部同步服务使用。 + */ + public List listUserOrganizations() { + return authMapper.listUsers().stream() + .map(u -> new UserOrgMapping(u.getUsername(), u.getOrganization())) + .toList(); + } + + public record UserOrgMapping(String username, String organization) {} + public List listRoles() { return authMapper.listRoles(); } diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/auth/domain/model/AuthUserSummary.java b/backend/shared/domain-common/src/main/java/com/datamate/common/auth/domain/model/AuthUserSummary.java index 3707c3f..b1e9108 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/auth/domain/model/AuthUserSummary.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/auth/domain/model/AuthUserSummary.java @@ -14,5 +14,6 @@ public class AuthUserSummary { private String email; private String fullName; private Boolean enabled; + private String organization; } diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/auth/interfaces/rest/AuthController.java b/backend/shared/domain-common/src/main/java/com/datamate/common/auth/interfaces/rest/AuthController.java index d96c3cd..a5bb49c 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/auth/interfaces/rest/AuthController.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/auth/interfaces/rest/AuthController.java @@ -58,6 +58,14 @@ public class AuthController { return authApplicationService.listUsersWithRoles(); } + /** + * 内部接口:返回所有用户的用户名与组织映射,供知识图谱同步服务调用。 + */ + @GetMapping("/users/organizations") + public List listUserOrganizations() { + return authApplicationService.listUserOrganizations(); + } + @PutMapping("/users/{userId}/roles") public void assignRoles(@PathVariable("userId") Long userId, @RequestBody @Valid AssignUserRolesRequest request) { diff --git a/backend/shared/domain-common/src/main/resources/mappers/AuthMapper.xml b/backend/shared/domain-common/src/main/resources/mappers/AuthMapper.xml index 7799b90..441de74 100644 --- a/backend/shared/domain-common/src/main/resources/mappers/AuthMapper.xml +++ b/backend/shared/domain-common/src/main/resources/mappers/AuthMapper.xml @@ -66,7 +66,8 @@ username, email, full_name AS fullName, - enabled + enabled, + organization FROM users ORDER BY id ASC