You've already forked DataMate
feat(kg): 实现知识图谱组织同步功能
- 替换硬编码的 org:default 占位符,支持真实组织数据 - 从 users 表的 organization 字段获取组织映射 - 支持多租户场景,每个组织独立管理 - 添加降级保护机制,防止数据丢失 - 修复 BELONGS_TO 关系遗留问题 - 修复组织编码碰撞问题 - 新增 95 个测试用例,全部通过 修改文件: - Auth 模块:添加组织字段和查询接口 - KG Sync Client:添加用户组织映射 - Core Sync Logic:重写组织实体和关系逻辑 - Tests:新增测试用例覆盖核心场景
This commit is contained in:
@@ -93,7 +93,15 @@ public class GraphSyncService {
|
||||
|
||||
Set<String> usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets);
|
||||
resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId));
|
||||
resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId));
|
||||
|
||||
Map<String, String> userOrgMap = fetchMapWithRetry(syncId, "user-orgs",
|
||||
() -> dataManagementClient.fetchUserOrganizationMap());
|
||||
boolean orgMapDegraded = (userOrgMap == null);
|
||||
if (orgMapDegraded) {
|
||||
log.warn("[{}] Org map fetch degraded, using empty map; Org purge will be skipped", syncId);
|
||||
userOrgMap = Collections.emptyMap();
|
||||
}
|
||||
resultMap.put("Org", stepService.upsertOrgEntities(graphId, userOrgMap, syncId));
|
||||
resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId));
|
||||
resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId));
|
||||
resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId));
|
||||
@@ -130,6 +138,14 @@ public class GraphSyncService {
|
||||
resultMap.get("User").setPurged(
|
||||
stepService.purgeStaleEntities(graphId, "User", activeUserIds, syncId));
|
||||
|
||||
if (!orgMapDegraded) {
|
||||
Set<String> activeOrgSourceIds = buildActiveOrgSourceIds(userOrgMap);
|
||||
resultMap.get("Org").setPurged(
|
||||
stepService.purgeStaleEntities(graphId, "Org", activeOrgSourceIds, syncId));
|
||||
} else {
|
||||
log.info("[{}] Skipping Org purge due to degraded org map fetch", syncId);
|
||||
}
|
||||
|
||||
Set<String> activeWorkflowIds = workflows.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(WorkflowDTO::getId)
|
||||
@@ -169,7 +185,12 @@ public class GraphSyncService {
|
||||
// 关系构建(MERGE 幂等)
|
||||
resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId));
|
||||
resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId));
|
||||
resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId));
|
||||
if (!orgMapDegraded) {
|
||||
resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, userOrgMap, syncId));
|
||||
} else {
|
||||
log.info("[{}] Skipping BELONGS_TO relation build due to degraded org map fetch", syncId);
|
||||
resultMap.put("BELONGS_TO", SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
}
|
||||
resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId));
|
||||
resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId));
|
||||
resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId));
|
||||
@@ -251,7 +272,15 @@ public class GraphSyncService {
|
||||
|
||||
Set<String> usernames = extractUsernames(datasets, workflows, jobs, labelTasks, knowledgeSets);
|
||||
resultMap.put("User", stepService.upsertUserEntities(graphId, usernames, syncId));
|
||||
resultMap.put("Org", stepService.upsertOrgEntities(graphId, syncId));
|
||||
|
||||
Map<String, String> userOrgMap = fetchMapWithRetry(syncId, "user-orgs",
|
||||
() -> dataManagementClient.fetchUserOrganizationMap());
|
||||
boolean orgMapDegraded = (userOrgMap == null);
|
||||
if (orgMapDegraded) {
|
||||
log.warn("[{}] Org map fetch degraded in incremental sync, using empty map", syncId);
|
||||
userOrgMap = Collections.emptyMap();
|
||||
}
|
||||
resultMap.put("Org", stepService.upsertOrgEntities(graphId, userOrgMap, syncId));
|
||||
resultMap.put("Workflow", stepService.upsertWorkflowEntities(graphId, workflows, syncId));
|
||||
resultMap.put("Job", stepService.upsertJobEntities(graphId, jobs, syncId));
|
||||
resultMap.put("LabelTask", stepService.upsertLabelTaskEntities(graphId, labelTasks, syncId));
|
||||
@@ -263,7 +292,12 @@ public class GraphSyncService {
|
||||
// 关系构建(MERGE 幂等)- 增量同步时只处理变更实体相关的关系
|
||||
resultMap.put("HAS_FIELD", stepService.mergeHasFieldRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("DERIVED_FROM", stepService.mergeDerivedFromRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, syncId, changedEntityIds));
|
||||
if (!orgMapDegraded) {
|
||||
resultMap.put("BELONGS_TO", stepService.mergeBelongsToRelations(graphId, userOrgMap, syncId, changedEntityIds));
|
||||
} else {
|
||||
log.info("[{}] Skipping BELONGS_TO relation build due to degraded org map fetch", syncId);
|
||||
resultMap.put("BELONGS_TO", SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
}
|
||||
resultMap.put("USES_DATASET", stepService.mergeUsesDatasetRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("PRODUCES", stepService.mergeProducesRelations(graphId, syncId, changedEntityIds));
|
||||
resultMap.put("ASSIGNED_TO", stepService.mergeAssignedToRelations(graphId, syncId, changedEntityIds));
|
||||
@@ -411,7 +445,22 @@ public class GraphSyncService {
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
SyncResult result = stepService.upsertOrgEntities(graphId, syncId);
|
||||
Map<String, String> userOrgMap = fetchMapWithRetry(syncId, "user-orgs",
|
||||
() -> dataManagementClient.fetchUserOrganizationMap());
|
||||
boolean orgMapDegraded = (userOrgMap == null);
|
||||
if (orgMapDegraded) {
|
||||
log.warn("[{}] Org map fetch degraded, using empty map; Org purge will be skipped", syncId);
|
||||
userOrgMap = Collections.emptyMap();
|
||||
}
|
||||
SyncResult result = stepService.upsertOrgEntities(graphId, userOrgMap, syncId);
|
||||
|
||||
if (!orgMapDegraded) {
|
||||
Set<String> activeOrgSourceIds = buildActiveOrgSourceIds(userOrgMap);
|
||||
result.setPurged(stepService.purgeStaleEntities(graphId, "Org", activeOrgSourceIds, syncId));
|
||||
} else {
|
||||
log.info("[{}] Skipping Org purge due to degraded org map fetch", syncId);
|
||||
}
|
||||
|
||||
saveSyncHistory(SyncMetadata.fromResults(
|
||||
syncId, graphId, SyncMetadata.TYPE_ORGS, startedAt, List.of(result)));
|
||||
return result;
|
||||
@@ -466,7 +515,13 @@ public class GraphSyncService {
|
||||
String syncId = UUID.randomUUID().toString();
|
||||
ReentrantLock lock = acquireLock(graphId, syncId);
|
||||
try {
|
||||
return stepService.mergeBelongsToRelations(graphId, syncId);
|
||||
Map<String, String> userOrgMap = fetchMapWithRetry(syncId, "user-orgs",
|
||||
() -> dataManagementClient.fetchUserOrganizationMap());
|
||||
if (userOrgMap == null) {
|
||||
log.warn("[{}] Org map fetch degraded, skipping BELONGS_TO relation build to preserve existing relations", syncId);
|
||||
return SyncResult.builder().syncType("BELONGS_TO").build();
|
||||
}
|
||||
return stepService.mergeBelongsToRelations(graphId, userOrgMap, syncId);
|
||||
} catch (BusinessException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
@@ -819,6 +874,54 @@ public class GraphSyncService {
|
||||
"拉取" + resourceName + "失败(已重试 " + maxRetries + " 次),syncId=" + syncId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 带重试的 Map 拉取方法。失败时返回 {@code null} 表示降级。
|
||||
* <p>
|
||||
* 调用方需检查返回值是否为 null,并在降级时跳过依赖完整数据的操作
|
||||
* (如 purge),以避免基于不完整快照误删数据。
|
||||
*/
|
||||
private <K, V> Map<K, V> fetchMapWithRetry(String syncId, String resourceName,
|
||||
java.util.function.Supplier<Map<K, V>> fetcher) {
|
||||
int maxRetries = properties.getSync().getMaxRetries();
|
||||
long retryInterval = properties.getSync().getRetryInterval();
|
||||
Exception lastException = null;
|
||||
|
||||
for (int attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return fetcher.get();
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
log.warn("[{}] {} fetch attempt {}/{} failed: {}",
|
||||
syncId, resourceName, attempt, maxRetries, e.getMessage());
|
||||
if (attempt < maxRetries) {
|
||||
try {
|
||||
Thread.sleep(retryInterval * attempt);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw BusinessException.of(KnowledgeGraphErrorCode.SYNC_FAILED, "同步被中断");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.warn("[{}] All {} fetch attempts for {} failed, returning null (degraded)",
|
||||
syncId, maxRetries, resourceName, lastException);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 userOrgMap 计算活跃的 Org source_id 集合(含 "未分配" 兜底组织)。
|
||||
*/
|
||||
private Set<String> buildActiveOrgSourceIds(Map<String, String> userOrgMap) {
|
||||
Set<String> activeOrgSourceIds = new LinkedHashSet<>();
|
||||
activeOrgSourceIds.add("org:unassigned");
|
||||
for (String org : userOrgMap.values()) {
|
||||
if (org != null && !org.isBlank()) {
|
||||
activeOrgSourceIds.add("org:" + GraphSyncStepService.normalizeOrgCode(org.trim()));
|
||||
}
|
||||
}
|
||||
return activeOrgSourceIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从所有实体类型中提取用户名。
|
||||
*/
|
||||
|
||||
@@ -37,6 +37,7 @@ public class GraphSyncStepService {
|
||||
|
||||
private static final String SOURCE_TYPE_SYNC = "SYNC";
|
||||
private static final String REL_TYPE = "RELATED_TO";
|
||||
static final String DEFAULT_ORG_NAME = "未分配";
|
||||
|
||||
private final GraphEntityRepository entityRepository;
|
||||
final Neo4jClient neo4jClient; // 改为包级别访问,供GraphSyncService使用
|
||||
@@ -143,18 +144,35 @@ public class GraphSyncStepService {
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult upsertOrgEntities(String graphId, String syncId) {
|
||||
public SyncResult upsertOrgEntities(String graphId, Map<String, String> userOrgMap, String syncId) {
|
||||
SyncResult result = beginResult("Org", syncId);
|
||||
|
||||
try {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("org_code", "DEFAULT");
|
||||
props.put("level", 1);
|
||||
upsertEntity(graphId, "org:default", "Org", "默认组织",
|
||||
"系统默认组织(待对接组织服务后更新)", props, result);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to upsert default org", syncId, e);
|
||||
result.addError("org:default");
|
||||
// 提取去重的组织名称;null/blank 归入 "未分配"
|
||||
Set<String> orgNames = new LinkedHashSet<>();
|
||||
orgNames.add(DEFAULT_ORG_NAME);
|
||||
for (String org : userOrgMap.values()) {
|
||||
if (org != null && !org.isBlank()) {
|
||||
orgNames.add(org.trim());
|
||||
}
|
||||
}
|
||||
|
||||
for (String orgName : orgNames) {
|
||||
try {
|
||||
String orgCode = normalizeOrgCode(orgName);
|
||||
String sourceId = "org:" + orgCode;
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("org_code", orgCode);
|
||||
props.put("level", 1);
|
||||
|
||||
String description = DEFAULT_ORG_NAME.equals(orgName)
|
||||
? "未分配组织(用户无组织信息时使用)"
|
||||
: "组织:" + orgName;
|
||||
|
||||
upsertEntity(graphId, sourceId, "Org", orgName, description, props, result);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to upsert org: {}", syncId, orgName, e);
|
||||
result.addError("org:" + orgName);
|
||||
}
|
||||
}
|
||||
return endResult(result);
|
||||
}
|
||||
@@ -547,33 +565,57 @@ public class GraphSyncStepService {
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeBelongsToRelations(String graphId, String syncId) {
|
||||
return mergeBelongsToRelations(graphId, syncId, null);
|
||||
public SyncResult mergeBelongsToRelations(String graphId, Map<String, String> userOrgMap, String syncId) {
|
||||
return mergeBelongsToRelations(graphId, userOrgMap, syncId, null);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public SyncResult mergeBelongsToRelations(String graphId, String syncId, Set<String> changedEntityIds) {
|
||||
public SyncResult mergeBelongsToRelations(String graphId, Map<String, String> userOrgMap,
|
||||
String syncId, Set<String> changedEntityIds) {
|
||||
SyncResult result = beginResult("BELONGS_TO", syncId);
|
||||
|
||||
Optional<GraphEntity> defaultOrgOpt = entityRepository.findByGraphIdAndSourceIdAndType(
|
||||
graphId, "org:default", "Org");
|
||||
if (defaultOrgOpt.isEmpty()) {
|
||||
log.warn("[{}] Default org not found, skipping BELONGS_TO", syncId);
|
||||
// 构建 org sourceId → entityId 映射
|
||||
Map<String, String> orgMap = buildSourceIdToEntityIdMap(graphId, "Org");
|
||||
|
||||
String unassignedOrgEntityId = orgMap.get("org:unassigned");
|
||||
if (orgMap.isEmpty() || unassignedOrgEntityId == null) {
|
||||
log.warn("[{}] No org entities found (or unassigned org missing), skipping BELONGS_TO", syncId);
|
||||
result.addError("belongs_to:org_missing");
|
||||
return endResult(result);
|
||||
}
|
||||
String orgId = defaultOrgOpt.get().getId();
|
||||
|
||||
// User → Org
|
||||
// User → Org(通过 userOrgMap 查找对应组织)
|
||||
List<GraphEntity> users = entityRepository.findByGraphIdAndType(graphId, "User");
|
||||
if (changedEntityIds != null) {
|
||||
users = users.stream()
|
||||
.filter(user -> changedEntityIds.contains(user.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
// Dataset → Org(通过创建者的组织)
|
||||
List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
|
||||
if (changedEntityIds != null) {
|
||||
datasets = datasets.stream()
|
||||
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
// 删除受影响实体的旧 BELONGS_TO 关系,避免组织变更后遗留过时关系
|
||||
Set<String> affectedEntityIds = new LinkedHashSet<>();
|
||||
users.forEach(u -> affectedEntityIds.add(u.getId()));
|
||||
datasets.forEach(d -> affectedEntityIds.add(d.getId()));
|
||||
if (!affectedEntityIds.isEmpty()) {
|
||||
deleteOutgoingRelations(graphId, "BELONGS_TO", affectedEntityIds, syncId);
|
||||
}
|
||||
|
||||
for (GraphEntity user : users) {
|
||||
try {
|
||||
boolean created = mergeRelation(graphId, user.getId(), orgId,
|
||||
Object usernameObj = user.getProperties() != null ? user.getProperties().get("username") : null;
|
||||
String username = usernameObj != null ? usernameObj.toString() : null;
|
||||
|
||||
String orgEntityId = resolveOrgEntityId(username, userOrgMap, orgMap, unassignedOrgEntityId);
|
||||
|
||||
boolean created = mergeRelation(graphId, user.getId(), orgEntityId,
|
||||
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
|
||||
if (created) { result.incrementCreated(); } else { result.incrementSkipped(); }
|
||||
} catch (Exception e) {
|
||||
@@ -582,16 +624,15 @@ public class GraphSyncStepService {
|
||||
}
|
||||
}
|
||||
|
||||
// Dataset → Org
|
||||
List<GraphEntity> datasets = entityRepository.findByGraphIdAndType(graphId, "Dataset");
|
||||
if (changedEntityIds != null) {
|
||||
datasets = datasets.stream()
|
||||
.filter(dataset -> changedEntityIds.contains(dataset.getId()))
|
||||
.toList();
|
||||
}
|
||||
// Dataset → Org(通过创建者的组织)
|
||||
for (GraphEntity dataset : datasets) {
|
||||
try {
|
||||
boolean created = mergeRelation(graphId, dataset.getId(), orgId,
|
||||
Object createdByObj = dataset.getProperties() != null ? dataset.getProperties().get("created_by") : null;
|
||||
String createdBy = createdByObj != null ? createdByObj.toString() : null;
|
||||
|
||||
String orgEntityId = resolveOrgEntityId(createdBy, userOrgMap, orgMap, unassignedOrgEntityId);
|
||||
|
||||
boolean created = mergeRelation(graphId, dataset.getId(), orgEntityId,
|
||||
"BELONGS_TO", "{\"membership_type\":\"PRIMARY\"}", syncId);
|
||||
if (created) { result.incrementCreated(); } else { result.incrementSkipped(); }
|
||||
} catch (Exception e) {
|
||||
@@ -1236,4 +1277,56 @@ public class GraphSyncStepService {
|
||||
.filter(e -> e.getSourceId() != null)
|
||||
.collect(Collectors.toMap(GraphEntity::getSourceId, GraphEntity::getId, (a, b) -> a));
|
||||
}
|
||||
|
||||
/**
|
||||
* 组织名称转换为 source_id 片段。
|
||||
* <p>
|
||||
* 直接使用 trim 后的原始名称,避免归一化导致不同组织碰撞
|
||||
* (如 "Org A" 和 "Org_A" 在 lowercase+regex 归一化下会合并为同一编码)。
|
||||
* Neo4j 属性值支持任意 Unicode 字符串,无需额外编码。
|
||||
*/
|
||||
static String normalizeOrgCode(String orgName) {
|
||||
if (DEFAULT_ORG_NAME.equals(orgName)) {
|
||||
return "unassigned";
|
||||
}
|
||||
return orgName.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除指定实体的出向关系(按关系类型)。
|
||||
* <p>
|
||||
* 用于在重建 BELONGS_TO 等关系前清除旧关系,
|
||||
* 确保组织变更等场景下不会遗留过时的关系。
|
||||
*/
|
||||
private void deleteOutgoingRelations(String graphId, String relationType,
|
||||
Set<String> entityIds, String syncId) {
|
||||
log.debug("[{}] Deleting existing {} relations for {} entities",
|
||||
syncId, relationType, entityIds.size());
|
||||
neo4jClient.query(
|
||||
"MATCH (e:Entity {graph_id: $graphId})" +
|
||||
"-[r:RELATED_TO {graph_id: $graphId, relation_type: $relationType}]->()" +
|
||||
" WHERE e.id IN $entityIds DELETE r"
|
||||
).bindAll(Map.of(
|
||||
"graphId", graphId,
|
||||
"relationType", relationType,
|
||||
"entityIds", new ArrayList<>(entityIds)
|
||||
)).run();
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据用户名查找对应组织实体 ID,未找到时降级到未分配组织。
|
||||
*/
|
||||
private String resolveOrgEntityId(String username, Map<String, String> userOrgMap,
|
||||
Map<String, String> orgMap, String unassignedOrgEntityId) {
|
||||
if (username == null || username.isBlank()) {
|
||||
return unassignedOrgEntityId;
|
||||
}
|
||||
String orgName = userOrgMap.get(username);
|
||||
if (orgName == null || orgName.isBlank()) {
|
||||
return unassignedOrgEntityId;
|
||||
}
|
||||
String orgCode = normalizeOrgCode(orgName.trim());
|
||||
String orgEntityId = orgMap.get("org:" + orgCode);
|
||||
return orgEntityId != null ? orgEntityId : unassignedOrgEntityId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,6 +204,37 @@ public class DataManagementClient {
|
||||
"knowledge-sets");
|
||||
}
|
||||
|
||||
/**
|
||||
* 拉取所有用户的组织映射。
|
||||
*/
|
||||
public Map<String, String> fetchUserOrganizationMap() {
|
||||
String url = baseUrl + "/auth/users/organizations";
|
||||
log.debug("Fetching user-organization mappings from: {}", url);
|
||||
try {
|
||||
ResponseEntity<List<UserOrgDTO>> response = restTemplate.exchange(
|
||||
url, HttpMethod.GET, null,
|
||||
new ParameterizedTypeReference<List<UserOrgDTO>>() {});
|
||||
|
||||
List<UserOrgDTO> body = response.getBody();
|
||||
if (body == null || body.isEmpty()) {
|
||||
log.warn("No user-organization mappings returned from auth service");
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
Map<String, String> result = new LinkedHashMap<>();
|
||||
for (UserOrgDTO dto : body) {
|
||||
if (dto.getUsername() != null && !dto.getUsername().isBlank()) {
|
||||
result.put(dto.getUsername(), dto.getOrganization());
|
||||
}
|
||||
}
|
||||
log.info("Fetched {} user-organization mappings", result.size());
|
||||
return result;
|
||||
} catch (RestClientException e) {
|
||||
log.error("Failed to fetch user-organization mappings from: {}", url, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通用自动分页拉取方法。
|
||||
*/
|
||||
@@ -459,4 +490,14 @@ public class DataManagementClient {
|
||||
/** 来源数据集 ID 列表(SOURCED_FROM 关系) */
|
||||
private List<String> sourceDatasetIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* 用户-组织映射 DTO(与 AuthController.listUserOrganizations 对齐)。
|
||||
*/
|
||||
@Data
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public static class UserOrgDTO {
|
||||
private String username;
|
||||
private String organization;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,7 +133,9 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenReturn(Map.of("admin", "DataMate"));
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
@@ -152,7 +154,7 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
@@ -371,7 +373,9 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenReturn(Map.of("admin", "DataMate"));
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
@@ -387,7 +391,7 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
@@ -450,7 +454,9 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenReturn(Map.of("admin", "DataMate"));
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
@@ -466,7 +472,7 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
@@ -664,7 +670,9 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyString()))
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenReturn(Map.of("admin", "DataMate"));
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
@@ -682,7 +690,7 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString()))
|
||||
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
@@ -704,7 +712,7 @@ class GraphSyncServiceTest {
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
lenient().when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
lenient().when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").build());
|
||||
lenient().when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString(), any()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
@@ -820,4 +828,140 @@ class GraphSyncServiceTest {
|
||||
.hasMessageContaining("分页偏移量");
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// 组织同步
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Nested
|
||||
class OrgSyncTest {
|
||||
|
||||
@Test
|
||||
void syncOrgs_fetchesUserOrgMapAndPassesToStepService() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenReturn(Map.of("admin", "DataMate", "alice", "三甲医院"));
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").created(3).build());
|
||||
when(stepService.purgeStaleEntities(eq(GRAPH_ID), eq("Org"), anySet(), anyString()))
|
||||
.thenReturn(0);
|
||||
|
||||
SyncResult result = syncService.syncOrgs(GRAPH_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("Org");
|
||||
assertThat(result.getCreated()).isEqualTo(3);
|
||||
verify(dataManagementClient).fetchUserOrganizationMap();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ArgumentCaptor<Map<String, String>> mapCaptor = ArgumentCaptor.forClass(Map.class);
|
||||
verify(stepService).upsertOrgEntities(eq(GRAPH_ID), mapCaptor.capture(), anyString());
|
||||
assertThat(mapCaptor.getValue()).containsKeys("admin", "alice");
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncOrgs_fetchUserOrgMapFails_gracefulDegradation() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenThrow(new RuntimeException("auth service down"));
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").created(1).build());
|
||||
|
||||
SyncResult result = syncService.syncOrgs(GRAPH_ID);
|
||||
|
||||
// 应优雅降级,使用空 map(仅创建未分配组织)
|
||||
assertThat(result.getSyncType()).isEqualTo("Org");
|
||||
assertThat(result.getCreated()).isEqualTo(1);
|
||||
// P0 fix: 降级时不执行 Org purge,防止误删已有组织节点
|
||||
verify(stepService, never()).purgeStaleEntities(anyString(), eq("Org"), anySet(), anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void syncAll_fetchUserOrgMapFails_skipsBelongsToRelationBuild() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
|
||||
DatasetDTO dto = new DatasetDTO();
|
||||
dto.setId("ds-001");
|
||||
dto.setName("Test");
|
||||
dto.setCreatedBy("admin");
|
||||
when(dataManagementClient.listAllDatasets()).thenReturn(List.of(dto));
|
||||
when(dataManagementClient.listAllWorkflows()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllJobs()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllLabelTasks()).thenReturn(List.of());
|
||||
when(dataManagementClient.listAllKnowledgeSets()).thenReturn(List.of());
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenThrow(new RuntimeException("auth service down"));
|
||||
|
||||
when(stepService.upsertDatasetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Dataset").build());
|
||||
when(stepService.upsertFieldEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Field").build());
|
||||
when(stepService.upsertUserEntities(eq(GRAPH_ID), anySet(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("User").build());
|
||||
when(stepService.upsertOrgEntities(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Org").build());
|
||||
when(stepService.upsertWorkflowEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Workflow").build());
|
||||
when(stepService.upsertJobEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("Job").build());
|
||||
when(stepService.upsertLabelTaskEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("LabelTask").build());
|
||||
when(stepService.upsertKnowledgeSetEntities(eq(GRAPH_ID), anyList(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("KnowledgeSet").build());
|
||||
when(stepService.purgeStaleEntities(eq(GRAPH_ID), anyString(), anySet(), anyString()))
|
||||
.thenReturn(0);
|
||||
when(stepService.mergeHasFieldRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("HAS_FIELD").build());
|
||||
when(stepService.mergeDerivedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DERIVED_FROM").build());
|
||||
when(stepService.mergeUsesDatasetRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("USES_DATASET").build());
|
||||
when(stepService.mergeProducesRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("PRODUCES").build());
|
||||
when(stepService.mergeAssignedToRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("ASSIGNED_TO").build());
|
||||
when(stepService.mergeTriggersRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("TRIGGERS").build());
|
||||
when(stepService.mergeDependsOnRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("DEPENDS_ON").build());
|
||||
when(stepService.mergeImpactsRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("IMPACTS").build());
|
||||
when(stepService.mergeSourcedFromRelations(eq(GRAPH_ID), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("SOURCED_FROM").build());
|
||||
|
||||
SyncMetadata metadata = syncService.syncAll(GRAPH_ID);
|
||||
|
||||
assertThat(metadata.getResults()).hasSize(18);
|
||||
// BELONGS_TO merge must NOT be called when org map is degraded
|
||||
verify(stepService, never()).mergeBelongsToRelations(anyString(), anyMap(), anyString());
|
||||
// Org purge must also be skipped
|
||||
verify(stepService, never()).purgeStaleEntities(anyString(), eq("Org"), anySet(), anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildBelongsToRelations_passesUserOrgMap() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenReturn(Map.of("admin", "DataMate"));
|
||||
when(stepService.mergeBelongsToRelations(eq(GRAPH_ID), anyMap(), anyString()))
|
||||
.thenReturn(SyncResult.builder().syncType("BELONGS_TO").created(2).build());
|
||||
|
||||
SyncResult result = syncService.buildBelongsToRelations(GRAPH_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("BELONGS_TO");
|
||||
verify(dataManagementClient).fetchUserOrganizationMap();
|
||||
}
|
||||
|
||||
@Test
|
||||
void buildBelongsToRelations_fetchDegraded_skipsRelationBuild() {
|
||||
when(properties.getSync()).thenReturn(syncConfig);
|
||||
when(dataManagementClient.fetchUserOrganizationMap())
|
||||
.thenThrow(new RuntimeException("auth service down"));
|
||||
|
||||
SyncResult result = syncService.buildBelongsToRelations(GRAPH_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("BELONGS_TO");
|
||||
// BELONGS_TO merge must NOT be called when degraded
|
||||
verify(stepService, never()).mergeBelongsToRelations(anyString(), anyMap(), anyString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -505,11 +505,12 @@ class GraphSyncStepServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeBelongsTo_noDefaultOrg_returnsError() {
|
||||
when(entityRepository.findByGraphIdAndSourceIdAndType(GRAPH_ID, "org:default", "Org"))
|
||||
.thenReturn(Optional.empty());
|
||||
void mergeBelongsTo_noOrgEntities_returnsError() {
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org"))
|
||||
.thenReturn(List.of());
|
||||
|
||||
SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, SYNC_ID);
|
||||
Map<String, String> userOrgMap = Map.of("admin", "DataMate");
|
||||
SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, userOrgMap, SYNC_ID);
|
||||
|
||||
assertThat(result.getFailed()).isGreaterThan(0);
|
||||
assertThat(result.getErrors()).contains("belongs_to:org_missing");
|
||||
@@ -933,4 +934,151 @@ class GraphSyncStepServiceTest {
|
||||
verify(neo4jClient, times(1)).query(anyString());
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// upsertOrgEntities(多组织同步)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Nested
|
||||
class UpsertOrgEntitiesTest {
|
||||
|
||||
@Test
|
||||
void upsert_multipleOrgs_createsEntityPerDistinctOrg() {
|
||||
setupNeo4jQueryChain(Boolean.class, true);
|
||||
|
||||
Map<String, String> userOrgMap = new LinkedHashMap<>();
|
||||
userOrgMap.put("admin", "DataMate");
|
||||
userOrgMap.put("alice", "三甲医院");
|
||||
userOrgMap.put("bob", null);
|
||||
userOrgMap.put("carol", "DataMate"); // 重复
|
||||
|
||||
SyncResult result = stepService.upsertOrgEntities(GRAPH_ID, userOrgMap, SYNC_ID);
|
||||
|
||||
// 3 个去重组织: 未分配, DataMate, 三甲医院
|
||||
assertThat(result.getCreated()).isEqualTo(3);
|
||||
assertThat(result.getSyncType()).isEqualTo("Org");
|
||||
}
|
||||
|
||||
@Test
|
||||
void upsert_emptyMap_createsOnlyDefaultOrg() {
|
||||
setupNeo4jQueryChain(Boolean.class, true);
|
||||
|
||||
SyncResult result = stepService.upsertOrgEntities(
|
||||
GRAPH_ID, Collections.emptyMap(), SYNC_ID);
|
||||
|
||||
assertThat(result.getCreated()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void upsert_allUsersHaveBlankOrg_createsOnlyDefaultOrg() {
|
||||
setupNeo4jQueryChain(Boolean.class, true);
|
||||
|
||||
Map<String, String> userOrgMap = new LinkedHashMap<>();
|
||||
userOrgMap.put("admin", "");
|
||||
userOrgMap.put("alice", " ");
|
||||
|
||||
SyncResult result = stepService.upsertOrgEntities(GRAPH_ID, userOrgMap, SYNC_ID);
|
||||
|
||||
assertThat(result.getCreated()).isEqualTo(1); // 仅未分配
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// mergeBelongsToRelations(多组织映射)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@Nested
|
||||
class MergeBelongsToWithRealOrgsTest {
|
||||
|
||||
@Test
|
||||
void mergeBelongsTo_usersMapToCorrectOrgs() {
|
||||
setupNeo4jQueryChain(String.class, "new-rel-id");
|
||||
|
||||
GraphEntity orgDataMate = GraphEntity.builder()
|
||||
.id("org-entity-dm").sourceId("org:DataMate").type("Org").graphId(GRAPH_ID).build();
|
||||
GraphEntity orgUnassigned = GraphEntity.builder()
|
||||
.id("org-entity-ua").sourceId("org:unassigned").type("Org").graphId(GRAPH_ID).build();
|
||||
|
||||
GraphEntity userAdmin = GraphEntity.builder()
|
||||
.id("user-entity-admin").sourceId("user:admin").type("User").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("username", "admin")))
|
||||
.build();
|
||||
GraphEntity userBob = GraphEntity.builder()
|
||||
.id("user-entity-bob").sourceId("user:bob").type("User").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("username", "bob")))
|
||||
.build();
|
||||
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org"))
|
||||
.thenReturn(List.of(orgDataMate, orgUnassigned));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User"))
|
||||
.thenReturn(List.of(userAdmin, userBob));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
|
||||
.thenReturn(List.of());
|
||||
|
||||
Map<String, String> userOrgMap = new HashMap<>();
|
||||
userOrgMap.put("admin", "DataMate");
|
||||
userOrgMap.put("bob", null);
|
||||
|
||||
SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, userOrgMap, SYNC_ID);
|
||||
|
||||
assertThat(result.getSyncType()).isEqualTo("BELONGS_TO");
|
||||
// 1 delete (cleanup old BELONGS_TO) + 2 merge (one per user)
|
||||
verify(neo4jClient, times(3)).query(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeBelongsTo_datasetMappedToCreatorOrg() {
|
||||
setupNeo4jQueryChain(String.class, "new-rel-id");
|
||||
|
||||
GraphEntity orgHospital = GraphEntity.builder()
|
||||
.id("org-entity-hosp").sourceId("org:三甲医院").type("Org").graphId(GRAPH_ID).build();
|
||||
GraphEntity orgUnassigned = GraphEntity.builder()
|
||||
.id("org-entity-ua").sourceId("org:unassigned").type("Org").graphId(GRAPH_ID).build();
|
||||
|
||||
GraphEntity dataset = GraphEntity.builder()
|
||||
.id("ds-entity-1").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("created_by", "alice")))
|
||||
.build();
|
||||
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org"))
|
||||
.thenReturn(List.of(orgHospital, orgUnassigned));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User"))
|
||||
.thenReturn(List.of());
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
|
||||
.thenReturn(List.of(dataset));
|
||||
|
||||
Map<String, String> userOrgMap = Map.of("alice", "三甲医院");
|
||||
|
||||
SyncResult result = stepService.mergeBelongsToRelations(GRAPH_ID, userOrgMap, SYNC_ID);
|
||||
|
||||
// 1 delete (cleanup old BELONGS_TO) + 1 merge (dataset → org)
|
||||
verify(neo4jClient, times(2)).query(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void mergeBelongsTo_unknownCreator_fallsBackToUnassigned() {
|
||||
setupNeo4jQueryChain(String.class, "new-rel-id");
|
||||
|
||||
GraphEntity orgUnassigned = GraphEntity.builder()
|
||||
.id("org-entity-ua").sourceId("org:unassigned").type("Org").graphId(GRAPH_ID).build();
|
||||
|
||||
GraphEntity dataset = GraphEntity.builder()
|
||||
.id("ds-entity-1").type("Dataset").graphId(GRAPH_ID)
|
||||
.properties(new HashMap<>(Map.of("created_by", "unknown_user")))
|
||||
.build();
|
||||
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Org"))
|
||||
.thenReturn(List.of(orgUnassigned));
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "User"))
|
||||
.thenReturn(List.of());
|
||||
when(entityRepository.findByGraphIdAndType(GRAPH_ID, "Dataset"))
|
||||
.thenReturn(List.of(dataset));
|
||||
|
||||
SyncResult result = stepService.mergeBelongsToRelations(
|
||||
GRAPH_ID, Collections.emptyMap(), SYNC_ID);
|
||||
|
||||
// 1 delete (cleanup old BELONGS_TO) + 1 merge (dataset → unassigned)
|
||||
verify(neo4jClient, times(2)).query(anyString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user