From e62a8369d475f82408d3358189fb3f68a71385c2 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Mon, 23 Feb 2026 17:09:11 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20Neo4j=20schema=20mi?= =?UTF-8?q?gration=20=E5=B1=9E=E6=80=A7=E7=BC=BA=E5=A4=B1=E8=AD=A6?= =?UTF-8?q?=E5=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根本原因: - recordMigration 在成功时 errorMessage 为 null - HashMap.put("errorMessage", null) 导致 Neo4j 驱动异常或属性被移除 - 导致 _SchemaMigration 节点缺少属性 修复内容: - recordMigration: 所有 String 参数通过 nullToEmpty() 转换 - loadAppliedMigrations: 查询改用 COALESCE 提供默认值 - bootstrapMigrationSchema: 新增修复查询补充历史节点缺失属性 - validateChecksums: 跳过 checksum 为空的历史记录 测试: - 新增 4 个测试验证修复 - 21 个测试全部通过 --- .../migration/SchemaMigrationService.java | 52 ++++++-- .../migration/SchemaMigrationServiceTest.java | 115 ++++++++++++++++++ 2 files changed, 154 insertions(+), 13 deletions(-) diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java index c9a1f8c..4264f62 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java @@ -117,6 +117,17 @@ public class SchemaMigrationService { "CREATE CONSTRAINT schema_lock_name_unique IF NOT EXISTS " + "FOR (n:_SchemaLock) REQUIRE n.name IS UNIQUE" ).run(); + + // 修复历史遗留节点:为缺失属性补充默认值,避免后续查询产生属性缺失警告 + neo4jClient.query( + "MATCH (m:_SchemaMigration) WHERE m.description IS NULL OR m.checksum IS NULL " + + "SET m.description = COALESCE(m.description, ''), " + + " m.checksum = COALESCE(m.checksum, ''), " + + " m.applied_at = COALESCE(m.applied_at, ''), " + + " m.execution_time_ms = COALESCE(m.execution_time_ms, 0), " + + " m.statements_count = COALESCE(m.statements_count, 0), " + + " m.error_message = COALESCE(m.error_message, '')" + ).run(); } /** @@ -181,11 +192,14 @@ public class SchemaMigrationService { List loadAppliedMigrations() { return neo4jClient.query( "MATCH (m:_SchemaMigration {success: true}) " + - "RETURN m.version AS version, m.description AS description, " + - " m.checksum AS checksum, m.applied_at AS appliedAt, " + - " m.execution_time_ms AS executionTimeMs, " + - " m.success AS success, m.statements_count AS statementsCount, " + - " m.error_message AS errorMessage " + + "RETURN m.version AS version, " + + " COALESCE(m.description, '') AS description, " + + " COALESCE(m.checksum, '') AS checksum, " + + " COALESCE(m.applied_at, '') AS appliedAt, " + + " COALESCE(m.execution_time_ms, 0) AS executionTimeMs, " + + " m.success AS success, " + + " COALESCE(m.statements_count, 0) AS statementsCount, " + + " COALESCE(m.error_message, '') AS errorMessage " + "ORDER BY m.version" ).fetch().all().stream() .map(row -> SchemaMigrationRecord.builder() @@ -193,11 +207,9 @@ public class SchemaMigrationService { .description((String) row.get("description")) .checksum((String) row.get("checksum")) .appliedAt((String) row.get("appliedAt")) - .executionTimeMs(row.get("executionTimeMs") != null - ? ((Number) row.get("executionTimeMs")).longValue() : 0) + .executionTimeMs(((Number) row.get("executionTimeMs")).longValue()) .success(Boolean.TRUE.equals(row.get("success"))) - .statementsCount(row.get("statementsCount") != null - ? ((Number) row.get("statementsCount")).intValue() : 0) + .statementsCount(((Number) row.get("statementsCount")).intValue()) .errorMessage((String) row.get("errorMessage")) .build()) .toList(); @@ -216,6 +228,13 @@ public class SchemaMigrationService { continue; // 已应用但代码中不再有该迁移(可能是老版本被删除) } + // 跳过 checksum 为空的历史遗留记录(属性缺失修复后的节点) + if (record.getChecksum() == null || record.getChecksum().isEmpty()) { + log.warn("Migration V{} ({}) has no recorded checksum, skipping validation", + record.getVersion(), record.getDescription()); + continue; + } + String currentChecksum = computeChecksum(migration.getStatements()); if (!currentChecksum.equals(record.getChecksum())) { throw BusinessException.of( @@ -304,13 +323,13 @@ public class SchemaMigrationService { void recordMigration(SchemaMigrationRecord record) { Map params = new HashMap<>(); params.put("version", record.getVersion()); - params.put("description", record.getDescription()); - params.put("checksum", record.getChecksum()); - params.put("appliedAt", record.getAppliedAt()); + params.put("description", nullToEmpty(record.getDescription())); + params.put("checksum", nullToEmpty(record.getChecksum())); + params.put("appliedAt", nullToEmpty(record.getAppliedAt())); params.put("executionTimeMs", record.getExecutionTimeMs()); params.put("success", record.isSuccess()); params.put("statementsCount", record.getStatementsCount()); - params.put("errorMessage", record.getErrorMessage()); + params.put("errorMessage", nullToEmpty(record.getErrorMessage())); neo4jClient.query( "MERGE (m:_SchemaMigration {version: $version}) " + @@ -355,4 +374,11 @@ public class SchemaMigrationService { String lowerMsg = msg.toLowerCase(); return ALREADY_EXISTS_KEYWORDS.stream().anyMatch(kw -> lowerMsg.contains(kw.toLowerCase())); } + + /** + * 将 null 字符串转换为空字符串,避免 Neo4j 驱动 bindAll 传入 null 值导致属性缺失。 + */ + private static String nullToEmpty(String value) { + return value != null ? value : ""; + } } diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java index 1ab369f..a0db826 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java @@ -234,6 +234,29 @@ class SchemaMigrationServiceTest { assertThatCode(() -> service.migrate("test-instance")) .doesNotThrowAnyException(); } + + @Test + void migrate_emptyChecksum_skipsValidation() { + SchemaMigrationRecord legacyRecord = SchemaMigrationRecord.builder() + .version(1) + .description("Initial schema") + .checksum("") // empty checksum from legacy/repaired node + .appliedAt("") + .executionTimeMs(0L) + .success(true) + .statementsCount(0) + .build(); + + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), List.of(legacyRecord)); + + // Should NOT throw — empty checksum is skipped, and V1 is treated as applied + assertThatCode(() -> service.migrate("test-instance")) + .doesNotThrowAnyException(); + + // V1 should NOT be re-executed (it's in the applied set) + verify(neo4jClient, never()).query(contains("test1")); + } } // ----------------------------------------------------------------------- @@ -356,6 +379,7 @@ class SchemaMigrationServiceTest { @Nested class RetryAfterFailure { + @SuppressWarnings("unchecked") @Test void recordMigration_usesMerge_allowsRetryAfterFailure() { SchemaMigrationService service = createService(List.of(v1Migration)); @@ -382,6 +406,40 @@ class SchemaMigrationServiceTest { verify(neo4jClient).query(contains("MERGE")); } + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + void recordMigration_nullErrorMessage_boundAsEmptyString() { + SchemaMigrationService service = createService(List.of(v1Migration)); + + UnboundRunnableSpec unboundSpec = mock(UnboundRunnableSpec.class); + RunnableSpec runnableSpec = mock(RunnableSpec.class); + when(neo4jClient.query(contains("MERGE"))).thenReturn(unboundSpec); + when(unboundSpec.bindAll(anyMap())).thenReturn(runnableSpec); + + SchemaMigrationRecord record = SchemaMigrationRecord.builder() + .version(1) + .description("test") + .checksum("abc123") + .appliedAt("2025-01-01T00:00:00Z") + .executionTimeMs(100L) + .success(true) + .statementsCount(1) + // errorMessage intentionally not set (null) + .build(); + + service.recordMigration(record); + + ArgumentCaptor paramsCaptor = ArgumentCaptor.forClass(Map.class); + verify(unboundSpec).bindAll(paramsCaptor.capture()); + Map params = paramsCaptor.getValue(); + + // All String params must be non-null to avoid Neo4j driver issues + assertThat(params.get("errorMessage")).isEqualTo(""); + assertThat(params.get("description")).isEqualTo("test"); + assertThat(params.get("checksum")).isEqualTo("abc123"); + assertThat(params.get("appliedAt")).isEqualTo("2025-01-01T00:00:00Z"); + } + @Test void migrate_retryAfterFailure_recordsSuccess() { // Simulate: first run recorded a failure, second run should succeed. @@ -460,4 +518,61 @@ class SchemaMigrationServiceTest { assertThat(checksum1).isNotEqualTo(checksum2); } } + + // ----------------------------------------------------------------------- + // Bootstrap Repair + // ----------------------------------------------------------------------- + + @Nested + class BootstrapRepair { + + @Test + void bootstrapMigrationSchema_executesRepairQuery() { + SchemaMigrationService service = createService(List.of(v1Migration)); + + UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class); + when(neo4jClient.query(anyString())).thenReturn(spec); + + service.bootstrapMigrationSchema(); + + // Verify 3 queries: 2 constraints + 1 repair + verify(neo4jClient, times(3)).query(anyString()); + // Verify repair query targets nodes with missing properties + verify(neo4jClient).query(contains("m.description IS NULL OR m.checksum IS NULL")); + } + } + + // ----------------------------------------------------------------------- + // Load Applied Migrations Query + // ----------------------------------------------------------------------- + + @Nested + class LoadAppliedMigrationsQuery { + + @SuppressWarnings("unchecked") + @Test + void loadAppliedMigrations_usesCoalesceInQuery() { + SchemaMigrationService service = createService(List.of(v1Migration)); + + UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class); + RecordFetchSpec> fetchSpec = mock(RecordFetchSpec.class); + when(neo4jClient.query(contains("COALESCE"))).thenReturn(spec); + when(spec.fetch()).thenReturn(fetchSpec); + when(fetchSpec.all()).thenReturn(Collections.emptyList()); + + service.loadAppliedMigrations(); + + // Verify COALESCE is used for all optional properties + ArgumentCaptor queryCaptor = ArgumentCaptor.forClass(String.class); + verify(neo4jClient).query(queryCaptor.capture()); + String capturedQuery = queryCaptor.getValue(); + assertThat(capturedQuery) + .contains("COALESCE(m.description, '')") + .contains("COALESCE(m.checksum, '')") + .contains("COALESCE(m.applied_at, '')") + .contains("COALESCE(m.execution_time_ms, 0)") + .contains("COALESCE(m.statements_count, 0)") + .contains("COALESCE(m.error_message, '')"); + } + } }