From 7abdafc3382badfbcf46ee07db26831c970d2a34 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Thu, 19 Feb 2026 16:55:33 +0800 Subject: [PATCH] =?UTF-8?q?feat(kg):=20=E5=AE=9E=E7=8E=B0=20Schema=20?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E7=AE=A1=E7=90=86=E5=92=8C=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 Schema 迁移框架,参考 Flyway 设计思路 - 支持版本跟踪、变更检测、自动迁移 - 使用分布式锁确保多实例安全 - 支持 Checksum 校验防止已应用迁移被修改 - 使用 MERGE 策略支持失败后重试 - 使用数据库时间消除时钟偏差问题 核心组件: - SchemaMigration 接口:定义迁移脚本规范 - SchemaMigrationService:核心编排器 - V1__InitialSchema:基线迁移(14 条 DDL) - SchemaMigrationRecord:迁移记录 POJO 配置项: - migration.enabled:是否启用迁移(默认 true) - migration.validate-checksums:是否校验 checksum(默认 true) 向后兼容: - 已有数据库首次运行时,V1 的 14 条语句全部使用 IF NOT EXISTS - 适用于全新部署场景 新增 27 个测试用例,全部通过 测试结果:242 tests pass --- .../exception/KnowledgeGraphErrorCode.java | 5 +- .../neo4j/GraphInitializer.java | 100 +--- .../neo4j/KnowledgeGraphProperties.java | 13 + .../neo4j/migration/SchemaMigration.java | 20 + .../migration/SchemaMigrationRecord.java | 42 ++ .../migration/SchemaMigrationService.java | 358 ++++++++++++++ .../neo4j/migration/V1__InitialSchema.java | 66 +++ .../resources/application-knowledgegraph.yml | 6 + .../neo4j/GraphInitializerTest.java | 51 +- .../migration/SchemaMigrationServiceTest.java | 463 ++++++++++++++++++ 10 files changed, 989 insertions(+), 135 deletions(-) create mode 100644 backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigration.java create mode 100644 backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationRecord.java create mode 100644 backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java create mode 100644 backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/V1__InitialSchema.java create mode 100644 backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java index 5df4dd4..9d40785 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java @@ -24,7 +24,10 @@ public enum KnowledgeGraphErrorCode implements ErrorCode { SCHEMA_INIT_FAILED("knowledge_graph.0011", "图谱 Schema 初始化失败"), INSECURE_DEFAULT_CREDENTIALS("knowledge_graph.0012", "检测到默认凭据,生产环境禁止使用默认密码"), UNAUTHORIZED_INTERNAL_CALL("knowledge_graph.0013", "内部调用未授权:X-Internal-Token 校验失败"), - QUERY_TIMEOUT("knowledge_graph.0014", "图查询超时,请缩小搜索范围或减少深度"); + QUERY_TIMEOUT("knowledge_graph.0014", "图查询超时,请缩小搜索范围或减少深度"), + SCHEMA_MIGRATION_FAILED("knowledge_graph.0015", "Schema 迁移执行失败"), + SCHEMA_CHECKSUM_MISMATCH("knowledge_graph.0016", "Schema 迁移 checksum 不匹配:已应用的迁移被修改"), + SCHEMA_MIGRATION_LOCKED("knowledge_graph.0017", "Schema 迁移锁被占用,其他实例正在执行迁移"); private final String code; private final String message; diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java index d938859..13932ef 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java @@ -1,24 +1,21 @@ package com.datamate.knowledgegraph.infrastructure.neo4j; +import com.datamate.knowledgegraph.infrastructure.neo4j.migration.SchemaMigrationService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; -import org.springframework.data.neo4j.core.Neo4jClient; import org.springframework.stereotype.Component; -import java.util.List; import java.util.Set; +import java.util.UUID; /** * 图谱 Schema 初始化器。 *

- * 应用启动时自动创建 Neo4j 索引和约束。 - * 所有语句使用 {@code IF NOT EXISTS},保证幂等性。 - *

- * 对应 {@code docs/knowledge-graph/schema/schema.cypher} 中的第 1-3 部分。 + * 应用启动时通过 {@link SchemaMigrationService} 执行版本化 Schema 迁移。 *

* 安全自检:在非开发环境中,检测到默认 Neo4j 密码时拒绝启动。 */ @@ -33,13 +30,8 @@ public class GraphInitializer implements ApplicationRunner { "datamate123", "neo4j", "password", "123456", "admin" ); - /** 仅识别「已存在」类错误消息的关键词,其余错误不应吞掉。 */ - private static final Set ALREADY_EXISTS_KEYWORDS = Set.of( - "already exists", "already exist", "EquivalentSchemaRuleAlreadyExists" - ); - - private final Neo4jClient neo4jClient; private final KnowledgeGraphProperties properties; + private final SchemaMigrationService schemaMigrationService; @Value("${spring.neo4j.authentication.password:}") private String neo4jPassword; @@ -47,47 +39,6 @@ public class GraphInitializer implements ApplicationRunner { @Value("${spring.profiles.active:default}") private String activeProfile; - /** - * 需要在启动时执行的 Cypher 语句。 - * 每条语句必须独立执行(Neo4j 不支持多条 DDL 在同一事务中)。 - */ - private static final List SCHEMA_STATEMENTS = List.of( - // 约束(自动创建对应索引) - "CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE", - - // 同步 upsert 复合唯一约束:防止并发写入产生重复实体 - "CREATE CONSTRAINT entity_sync_unique IF NOT EXISTS " + - "FOR (n:Entity) REQUIRE (n.graph_id, n.source_id, n.type) IS UNIQUE", - - // 单字段索引 - "CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id)", - "CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)", - "CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)", - "CREATE INDEX entity_source_id IF NOT EXISTS FOR (n:Entity) ON (n.source_id)", - "CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at)", - - // 复合索引 - "CREATE INDEX entity_graph_id_type IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.type)", - "CREATE INDEX entity_graph_id_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.id)", - "CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)", - - // 全文索引 - "CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]", - - // ── SyncHistory 约束和索引 ── - - // P1: syncId 唯一约束,防止 ID 碰撞 - "CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " + - "FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE", - - // P2-3: 查询优化索引 - "CREATE INDEX sync_history_graph_started IF NOT EXISTS " + - "FOR (h:SyncHistory) ON (h.graph_id, h.started_at)", - - "CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " + - "FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)" - ); - @Override public void run(ApplicationArguments args) { // ── 安全自检:默认凭据检测 ── @@ -98,32 +49,7 @@ public class GraphInitializer implements ApplicationRunner { return; } - log.info("Initializing Neo4j schema: {} statements to execute", SCHEMA_STATEMENTS.size()); - - int succeeded = 0; - int failed = 0; - - for (String statement : SCHEMA_STATEMENTS) { - try { - neo4jClient.query(statement).run(); - succeeded++; - log.debug("Schema statement executed: {}", truncate(statement)); - } catch (Exception e) { - if (isAlreadyExistsError(e)) { - // 约束/索引已存在,安全跳过 - succeeded++; - log.debug("Schema element already exists (safe to skip): {}", truncate(statement)); - } else { - // 非「已存在」错误:记录并抛出,阻止启动 - failed++; - log.error("Schema statement FAILED: {} — {}", truncate(statement), e.getMessage()); - throw new IllegalStateException( - "Neo4j schema initialization failed: " + truncate(statement), e); - } - } - } - - log.info("Neo4j schema initialization completed: succeeded={}, failed={}", succeeded, failed); + schemaMigrationService.migrate(UUID.randomUUID().toString()); } /** @@ -149,20 +75,4 @@ public class GraphInitializer implements ApplicationRunner { } } } - - /** - * 判断异常是否仅因为 Schema 元素已存在(安全可忽略)。 - */ - private static boolean isAlreadyExistsError(Exception e) { - String msg = e.getMessage(); - if (msg == null) { - return false; - } - String lowerMsg = msg.toLowerCase(); - return ALREADY_EXISTS_KEYWORDS.stream().anyMatch(kw -> lowerMsg.contains(kw.toLowerCase())); - } - - private static String truncate(String s) { - return s.length() <= 100 ? s : s.substring(0, 97) + "..."; - } } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java index b048231..364579a 100644 --- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java @@ -32,6 +32,9 @@ public class KnowledgeGraphProperties { /** 安全相关配置 */ private Security security = new Security(); + /** Schema 迁移配置 */ + private Migration migration = new Migration(); + @Data public static class Security { @@ -82,4 +85,14 @@ public class KnowledgeGraphProperties { */ private boolean allowPurgeOnEmptySnapshot = false; } + + @Data + public static class Migration { + + /** 是否启用 Schema 版本化迁移 */ + private boolean enabled = true; + + /** 是否校验已应用迁移的 checksum(防止迁移被篡改) */ + private boolean validateChecksums = true; + } } diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigration.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigration.java new file mode 100644 index 0000000..cc63312 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigration.java @@ -0,0 +1,20 @@ +package com.datamate.knowledgegraph.infrastructure.neo4j.migration; + +import java.util.List; + +/** + * Schema 迁移接口。 + *

+ * 每个实现类代表一个版本化的 Schema 变更,版本号单调递增。 + */ +public interface SchemaMigration { + + /** 单调递增版本号 (1, 2, 3...) */ + int getVersion(); + + /** 人类可读描述 */ + String getDescription(); + + /** Cypher DDL 语句列表 */ + List getStatements(); +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationRecord.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationRecord.java new file mode 100644 index 0000000..5d7a0f3 --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationRecord.java @@ -0,0 +1,42 @@ +package com.datamate.knowledgegraph.infrastructure.neo4j.migration; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 迁移记录数据类,映射 {@code _SchemaMigration} 节点。 + *

+ * 纯 POJO,不使用 SDN {@code @Node} 注解。 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SchemaMigrationRecord { + + /** 迁移版本号 */ + private int version; + + /** 迁移描述 */ + private String description; + + /** 迁移语句的 SHA-256 校验和 */ + private String checksum; + + /** 迁移应用时间(ISO-8601) */ + private String appliedAt; + + /** 迁移执行耗时(毫秒) */ + private long executionTimeMs; + + /** 迁移是否成功 */ + private boolean success; + + /** 迁移语句数量 */ + private int statementsCount; + + /** 失败时的错误信息 */ + private String errorMessage; +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java new file mode 100644 index 0000000..c9a1f8c --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java @@ -0,0 +1,358 @@ +package com.datamate.knowledgegraph.infrastructure.neo4j.migration; + +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode; +import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.neo4j.core.Neo4jClient; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Schema 迁移编排器。 + *

+ * 参考 Flyway 设计思路,为 Neo4j 图数据库提供版本化迁移机制: + *

+ */ +@Component +@Slf4j +public class SchemaMigrationService { + + /** 分布式锁过期时间(毫秒),5 分钟 */ + private static final long LOCK_TIMEOUT_MS = 5 * 60 * 1000; + + /** 仅识别「已存在」类错误消息的关键词,其余错误不应吞掉。 */ + private static final Set ALREADY_EXISTS_KEYWORDS = Set.of( + "already exists", "already exist", "EquivalentSchemaRuleAlreadyExists" + ); + + private final Neo4jClient neo4jClient; + private final KnowledgeGraphProperties properties; + private final List migrations; + + public SchemaMigrationService(Neo4jClient neo4jClient, + KnowledgeGraphProperties properties, + List migrations) { + this.neo4jClient = neo4jClient; + this.properties = properties; + this.migrations = migrations.stream() + .sorted(Comparator.comparingInt(SchemaMigration::getVersion)) + .toList(); + } + + /** + * 执行 Schema 迁移主流程。 + * + * @param instanceId 当前实例标识,用于分布式锁 + */ + public void migrate(String instanceId) { + if (!properties.getMigration().isEnabled()) { + log.info("Schema migration is disabled, skipping"); + return; + } + + log.info("Starting schema migration, instanceId={}", instanceId); + + // 1. Bootstrap — 创建迁移系统自身需要的约束 + bootstrapMigrationSchema(); + + // 2. 获取分布式锁 + acquireLock(instanceId); + + try { + // 3. 加载已应用迁移 + List applied = loadAppliedMigrations(); + + // 4. 校验 checksum + if (properties.getMigration().isValidateChecksums()) { + validateChecksums(applied, migrations); + } + + // 5. 过滤待执行迁移 + Set appliedVersions = applied.stream() + .map(SchemaMigrationRecord::getVersion) + .collect(Collectors.toSet()); + + List pending = migrations.stream() + .filter(m -> !appliedVersions.contains(m.getVersion())) + .toList(); + + if (pending.isEmpty()) { + log.info("Schema is up to date, no pending migrations"); + return; + } + + // 6. 逐个执行 + executePendingMigrations(pending); + + log.info("Schema migration completed successfully, applied {} migration(s)", pending.size()); + + } finally { + // 7. 释放锁 + releaseLock(instanceId); + } + } + + /** + * 创建迁移系统自身需要的约束(解决鸡生蛋问题)。 + */ + void bootstrapMigrationSchema() { + log.debug("Bootstrapping migration schema constraints"); + neo4jClient.query( + "CREATE CONSTRAINT schema_migration_version_unique IF NOT EXISTS " + + "FOR (n:_SchemaMigration) REQUIRE n.version IS UNIQUE" + ).run(); + neo4jClient.query( + "CREATE CONSTRAINT schema_lock_name_unique IF NOT EXISTS " + + "FOR (n:_SchemaLock) REQUIRE n.name IS UNIQUE" + ).run(); + } + + /** + * 获取分布式锁。 + *

+ * MERGE {@code _SchemaLock} 节点,如果锁已被其他实例持有且未过期,则抛出异常。 + * 如果锁已过期(超过 5 分钟),自动接管。 + *

+ * 时间戳完全使用数据库端 {@code datetime().epochMillis},避免多实例时钟偏差导致锁被误抢占。 + */ + void acquireLock(String instanceId) { + log.debug("Acquiring schema migration lock, instanceId={}", instanceId); + + // 使用数据库时间(datetime().epochMillis)避免多实例时钟偏差导致锁被误抢占 + Optional> result = neo4jClient.query( + "MERGE (lock:_SchemaLock {name: 'schema_migration'}) " + + "ON CREATE SET lock.locked_by = $instanceId, lock.locked_at = datetime().epochMillis " + + "WITH lock, " + + " CASE WHEN lock.locked_by = $instanceId THEN true " + + " WHEN lock.locked_at < (datetime().epochMillis - $timeoutMs) THEN true " + + " ELSE false END AS canAcquire " + + "SET lock.locked_by = CASE WHEN canAcquire THEN $instanceId ELSE lock.locked_by END, " + + " lock.locked_at = CASE WHEN canAcquire THEN datetime().epochMillis ELSE lock.locked_at END " + + "RETURN lock.locked_by AS lockedBy, canAcquire" + ).bindAll(Map.of("instanceId", instanceId, "timeoutMs", LOCK_TIMEOUT_MS)) + .fetch().first(); + + if (result.isEmpty()) { + throw new IllegalStateException("Failed to acquire schema migration lock: unexpected empty result"); + } + + Boolean canAcquire = (Boolean) result.get().get("canAcquire"); + if (!Boolean.TRUE.equals(canAcquire)) { + String lockedBy = (String) result.get().get("lockedBy"); + throw BusinessException.of( + KnowledgeGraphErrorCode.SCHEMA_MIGRATION_LOCKED, + "Schema migration lock is held by instance: " + lockedBy + ); + } + + log.info("Schema migration lock acquired, instanceId={}", instanceId); + } + + /** + * 释放分布式锁。 + */ + void releaseLock(String instanceId) { + try { + neo4jClient.query( + "MATCH (lock:_SchemaLock {name: 'schema_migration', locked_by: $instanceId}) " + + "DELETE lock" + ).bindAll(Map.of("instanceId", instanceId)).run(); + log.debug("Schema migration lock released, instanceId={}", instanceId); + } catch (Exception e) { + log.warn("Failed to release schema migration lock: {}", e.getMessage()); + } + } + + /** + * 加载已应用的迁移记录。 + */ + List loadAppliedMigrations() { + return neo4jClient.query( + "MATCH (m:_SchemaMigration {success: true}) " + + "RETURN m.version AS version, m.description AS description, " + + " m.checksum AS checksum, m.applied_at AS appliedAt, " + + " m.execution_time_ms AS executionTimeMs, " + + " m.success AS success, m.statements_count AS statementsCount, " + + " m.error_message AS errorMessage " + + "ORDER BY m.version" + ).fetch().all().stream() + .map(row -> SchemaMigrationRecord.builder() + .version(((Number) row.get("version")).intValue()) + .description((String) row.get("description")) + .checksum((String) row.get("checksum")) + .appliedAt((String) row.get("appliedAt")) + .executionTimeMs(row.get("executionTimeMs") != null + ? ((Number) row.get("executionTimeMs")).longValue() : 0) + .success(Boolean.TRUE.equals(row.get("success"))) + .statementsCount(row.get("statementsCount") != null + ? ((Number) row.get("statementsCount")).intValue() : 0) + .errorMessage((String) row.get("errorMessage")) + .build()) + .toList(); + } + + /** + * 校验已应用迁移的 checksum。 + */ + void validateChecksums(List applied, List registered) { + Map registeredByVersion = registered.stream() + .collect(Collectors.toMap(SchemaMigration::getVersion, m -> m)); + + for (SchemaMigrationRecord record : applied) { + SchemaMigration migration = registeredByVersion.get(record.getVersion()); + if (migration == null) { + continue; // 已应用但代码中不再有该迁移(可能是老版本被删除) + } + + String currentChecksum = computeChecksum(migration.getStatements()); + if (!currentChecksum.equals(record.getChecksum())) { + throw BusinessException.of( + KnowledgeGraphErrorCode.SCHEMA_CHECKSUM_MISMATCH, + String.format("Migration V%d (%s): recorded checksum=%s, current checksum=%s", + record.getVersion(), record.getDescription(), + record.getChecksum(), currentChecksum) + ); + } + } + } + + /** + * 逐个执行待迁移。 + */ + void executePendingMigrations(List pending) { + for (SchemaMigration migration : pending) { + log.info("Executing migration V{}: {}", migration.getVersion(), migration.getDescription()); + + long startTime = System.currentTimeMillis(); + String errorMessage = null; + boolean success = true; + + try { + for (String statement : migration.getStatements()) { + try { + neo4jClient.query(statement).run(); + log.debug(" Statement executed: {}", + statement.length() <= 100 ? statement : statement.substring(0, 97) + "..."); + } catch (Exception e) { + if (isAlreadyExistsError(e)) { + log.debug(" Schema element already exists (safe to skip): {}", + statement.length() <= 100 ? statement : statement.substring(0, 97) + "..."); + } else { + throw e; + } + } + } + } catch (Exception e) { + success = false; + errorMessage = e.getMessage(); + + long elapsed = System.currentTimeMillis() - startTime; + recordMigration(SchemaMigrationRecord.builder() + .version(migration.getVersion()) + .description(migration.getDescription()) + .checksum(computeChecksum(migration.getStatements())) + .appliedAt(Instant.now().toString()) + .executionTimeMs(elapsed) + .success(false) + .statementsCount(migration.getStatements().size()) + .errorMessage(errorMessage) + .build()); + + throw BusinessException.of( + KnowledgeGraphErrorCode.SCHEMA_MIGRATION_FAILED, + String.format("Migration V%d (%s) failed: %s", + migration.getVersion(), migration.getDescription(), errorMessage) + ); + } + + long elapsed = System.currentTimeMillis() - startTime; + recordMigration(SchemaMigrationRecord.builder() + .version(migration.getVersion()) + .description(migration.getDescription()) + .checksum(computeChecksum(migration.getStatements())) + .appliedAt(Instant.now().toString()) + .executionTimeMs(elapsed) + .success(true) + .statementsCount(migration.getStatements().size()) + .build()); + + log.info("Migration V{} completed in {}ms", migration.getVersion(), elapsed); + } + } + + /** + * 写入迁移记录节点。 + *

+ * 使用 MERGE(按 version 匹配)+ SET 而非 CREATE,确保: + *

    + *
  • 失败后重试不会因唯一约束冲突而卡死(P0)
  • + *
  • 迁移执行成功但记录写入失败后,重跑可安全补写记录(幂等性)
  • + *
+ */ + void recordMigration(SchemaMigrationRecord record) { + Map params = new HashMap<>(); + params.put("version", record.getVersion()); + params.put("description", record.getDescription()); + params.put("checksum", record.getChecksum()); + params.put("appliedAt", record.getAppliedAt()); + params.put("executionTimeMs", record.getExecutionTimeMs()); + params.put("success", record.isSuccess()); + params.put("statementsCount", record.getStatementsCount()); + params.put("errorMessage", record.getErrorMessage()); + + neo4jClient.query( + "MERGE (m:_SchemaMigration {version: $version}) " + + "SET m.description = $description, " + + " m.checksum = $checksum, " + + " m.applied_at = $appliedAt, " + + " m.execution_time_ms = $executionTimeMs, " + + " m.success = $success, " + + " m.statements_count = $statementsCount, " + + " m.error_message = $errorMessage" + ).bindAll(params).run(); + } + + /** + * 计算语句列表的 SHA-256 校验和。 + */ + static String computeChecksum(List statements) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + for (String statement : statements) { + digest.update(statement.getBytes(StandardCharsets.UTF_8)); + } + byte[] hash = digest.digest(); + StringBuilder hex = new StringBuilder(); + for (byte b : hash) { + hex.append(String.format("%02x", b)); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 algorithm not available", e); + } + } + + /** + * 判断异常是否仅因为 Schema 元素已存在(安全可忽略)。 + */ + static boolean isAlreadyExistsError(Exception e) { + String msg = e.getMessage(); + if (msg == null) { + return false; + } + String lowerMsg = msg.toLowerCase(); + return ALREADY_EXISTS_KEYWORDS.stream().anyMatch(kw -> lowerMsg.contains(kw.toLowerCase())); + } +} diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/V1__InitialSchema.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/V1__InitialSchema.java new file mode 100644 index 0000000..4f98dab --- /dev/null +++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/V1__InitialSchema.java @@ -0,0 +1,66 @@ +package com.datamate.knowledgegraph.infrastructure.neo4j.migration; + +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * V1 基线迁移:初始 Schema。 + *

+ * 包含 {@code GraphInitializer} 中原有的全部 14 条 DDL 语句。 + * 在已有数据库上首次运行时,所有语句因 {@code IF NOT EXISTS} 而为 no-op, + * 但会建立版本基线。 + */ +@Component +public class V1__InitialSchema implements SchemaMigration { + + @Override + public int getVersion() { + return 1; + } + + @Override + public String getDescription() { + return "Initial schema: Entity and SyncHistory constraints and indexes"; + } + + @Override + public List getStatements() { + return List.of( + // 约束(自动创建对应索引) + "CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE", + + // 同步 upsert 复合唯一约束:防止并发写入产生重复实体 + "CREATE CONSTRAINT entity_sync_unique IF NOT EXISTS " + + "FOR (n:Entity) REQUIRE (n.graph_id, n.source_id, n.type) IS UNIQUE", + + // 单字段索引 + "CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id)", + "CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)", + "CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)", + "CREATE INDEX entity_source_id IF NOT EXISTS FOR (n:Entity) ON (n.source_id)", + "CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at)", + + // 复合索引 + "CREATE INDEX entity_graph_id_type IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.type)", + "CREATE INDEX entity_graph_id_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.id)", + "CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)", + + // 全文索引 + "CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]", + + // ── SyncHistory 约束和索引 ── + + // syncId 唯一约束,防止 ID 碰撞 + "CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " + + "FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE", + + // 查询优化索引 + "CREATE INDEX sync_history_graph_started IF NOT EXISTS " + + "FOR (h:SyncHistory) ON (h.graph_id, h.started_at)", + + "CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " + + "FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)" + ); + } +} diff --git a/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml b/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml index 45dcfe3..56b2acb 100644 --- a/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml +++ b/backend/services/knowledge-graph-service/src/main/resources/application-knowledgegraph.yml @@ -31,6 +31,12 @@ datamate: # 是否跳过 Token 校验(默认 false = fail-closed) # 仅在 dev/test 环境显式设置为 true 以跳过校验 skip-token-check: ${KG_SKIP_TOKEN_CHECK:false} + # Schema 迁移配置 + migration: + # 是否启用 Schema 版本化迁移 + enabled: ${KG_MIGRATION_ENABLED:true} + # 是否校验已应用迁移的 checksum(防止迁移被篡改) + validate-checksums: ${KG_MIGRATION_VALIDATE_CHECKSUMS:true} # MySQL → Neo4j 同步配置 sync: # 数据管理服务地址 diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializerTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializerTest.java index 2c9a450..76d1b9e 100644 --- a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializerTest.java +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializerTest.java @@ -1,13 +1,11 @@ package com.datamate.knowledgegraph.infrastructure.neo4j; +import com.datamate.knowledgegraph.infrastructure.neo4j.migration.SchemaMigrationService; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.boot.DefaultApplicationArguments; -import org.springframework.data.neo4j.core.Neo4jClient; -import org.springframework.data.neo4j.core.Neo4jClient.UnboundRunnableSpec; -import org.springframework.data.neo4j.core.Neo4jClient.RunnableSpec; import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThatCode; @@ -19,13 +17,13 @@ import static org.mockito.Mockito.*; class GraphInitializerTest { @Mock - private Neo4jClient neo4jClient; + private SchemaMigrationService schemaMigrationService; private GraphInitializer createInitializer(String password, String profile, boolean autoInit) { KnowledgeGraphProperties properties = new KnowledgeGraphProperties(); properties.getSync().setAutoInitSchema(autoInit); - GraphInitializer initializer = new GraphInitializer(neo4jClient, properties); + GraphInitializer initializer = new GraphInitializer(properties, schemaMigrationService); ReflectionTestUtils.setField(initializer, "neo4jPassword", password); ReflectionTestUtils.setField(initializer, "activeProfile", profile); return initializer; @@ -97,20 +95,16 @@ class GraphInitializerTest { } // ----------------------------------------------------------------------- - // Schema 初始化 — 成功 + // Schema 初始化 — 委托给 SchemaMigrationService // ----------------------------------------------------------------------- @Test - void run_autoInitEnabled_executesAllStatements() { + void run_autoInitEnabled_delegatesToMigrationService() { GraphInitializer initializer = createInitializer("s3cure!P@ss", "dev", true); - UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class); - when(neo4jClient.query(anyString())).thenReturn(spec); - initializer.run(new DefaultApplicationArguments()); - // Should execute all schema statements (constraints + indexes + fulltext) - verify(neo4jClient, atLeast(10)).query(anyString()); + verify(schemaMigrationService).migrate(anyString()); } @Test @@ -119,39 +113,18 @@ class GraphInitializerTest { initializer.run(new DefaultApplicationArguments()); - verifyNoInteractions(neo4jClient); - } - - // ----------------------------------------------------------------------- - // P2-7: Schema 初始化错误处理 - // ----------------------------------------------------------------------- - - @Test - void run_alreadyExistsError_safelyIgnored() { - GraphInitializer initializer = createInitializer("s3cure!P@ss", "dev", true); - - UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class); - when(neo4jClient.query(anyString())).thenReturn(spec); - doThrow(new RuntimeException("Constraint already exists")) - .when(spec).run(); - - // Should not throw — "already exists" errors are safely ignored - assertThatCode(() -> initializer.run(new DefaultApplicationArguments())) - .doesNotThrowAnyException(); + verifyNoInteractions(schemaMigrationService); } @Test - void run_nonExistenceError_throwsException() { + void run_migrationServiceThrows_propagatesException() { GraphInitializer initializer = createInitializer("s3cure!P@ss", "dev", true); - UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class); - when(neo4jClient.query(anyString())).thenReturn(spec); - doThrow(new RuntimeException("Connection refused to Neo4j")) - .when(spec).run(); + doThrow(new RuntimeException("Migration failed")) + .when(schemaMigrationService).migrate(anyString()); - // Non-"already exists" errors should propagate assertThatThrownBy(() -> initializer.run(new DefaultApplicationArguments())) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("schema initialization failed"); + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Migration failed"); } } diff --git a/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java new file mode 100644 index 0000000..1ab369f --- /dev/null +++ b/backend/services/knowledge-graph-service/src/test/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationServiceTest.java @@ -0,0 +1,463 @@ +package com.datamate.knowledgegraph.infrastructure.neo4j.migration; + +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode; +import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.neo4j.core.Neo4jClient; +import org.springframework.data.neo4j.core.Neo4jClient.RecordFetchSpec; +import org.springframework.data.neo4j.core.Neo4jClient.RunnableSpec; +import org.springframework.data.neo4j.core.Neo4jClient.UnboundRunnableSpec; + +import java.util.*; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class SchemaMigrationServiceTest { + + @Mock + private Neo4jClient neo4jClient; + + private KnowledgeGraphProperties properties; + + private SchemaMigration v1Migration; + private SchemaMigration v2Migration; + + @BeforeEach + void setUp() { + properties = new KnowledgeGraphProperties(); + + v1Migration = new SchemaMigration() { + @Override + public int getVersion() { return 1; } + @Override + public String getDescription() { return "Initial schema"; } + @Override + public List getStatements() { + return List.of("CREATE CONSTRAINT test1 IF NOT EXISTS FOR (n:Test) REQUIRE n.id IS UNIQUE"); + } + }; + + v2Migration = new SchemaMigration() { + @Override + public int getVersion() { return 2; } + @Override + public String getDescription() { return "Add index"; } + @Override + public List getStatements() { + return List.of("CREATE INDEX test_name IF NOT EXISTS FOR (n:Test) ON (n.name)"); + } + }; + } + + private SchemaMigrationService createService(List migrations) { + return new SchemaMigrationService(neo4jClient, properties, migrations); + } + + /** + * Creates a spy of the service with bootstrapMigrationSchema, acquireLock, + * releaseLock, and recordMigration stubbed out, and loadAppliedMigrations + * returning the given records. + */ + private SchemaMigrationService createSpiedService(List migrations, + List applied) { + SchemaMigrationService service = spy(createService(migrations)); + doNothing().when(service).bootstrapMigrationSchema(); + doNothing().when(service).acquireLock(anyString()); + doNothing().when(service).releaseLock(anyString()); + doReturn(applied).when(service).loadAppliedMigrations(); + lenient().doNothing().when(service).recordMigration(any()); + return service; + } + + private void setupQueryRunnable() { + UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class); + when(neo4jClient.query(anyString())).thenReturn(spec); + } + + private SchemaMigrationRecord appliedRecord(SchemaMigration migration) { + return SchemaMigrationRecord.builder() + .version(migration.getVersion()) + .description(migration.getDescription()) + .checksum(SchemaMigrationService.computeChecksum(migration.getStatements())) + .appliedAt("2025-01-01T00:00:00Z") + .executionTimeMs(100L) + .success(true) + .statementsCount(migration.getStatements().size()) + .build(); + } + + // ----------------------------------------------------------------------- + // Migration Disabled + // ----------------------------------------------------------------------- + + @Nested + class MigrationDisabled { + + @Test + void migrate_whenDisabled_skipsEverything() { + properties.getMigration().setEnabled(false); + SchemaMigrationService service = createService(List.of(v1Migration)); + + service.migrate("test-instance"); + + verifyNoInteractions(neo4jClient); + } + } + + // ----------------------------------------------------------------------- + // Fresh Database + // ----------------------------------------------------------------------- + + @Nested + class FreshDatabase { + + @Test + void migrate_freshDb_appliesAllMigrations() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), Collections.emptyList()); + setupQueryRunnable(); + + service.migrate("test-instance"); + + // Verify migration statement was executed + verify(neo4jClient).query(contains("test1")); + // Verify migration record was created + verify(service).recordMigration(argThat(r -> r.getVersion() == 1 && r.isSuccess())); + } + + @Test + void migrate_freshDb_bootstrapConstraintsCreated() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), Collections.emptyList()); + setupQueryRunnable(); + + service.migrate("test-instance"); + + // Verify bootstrap, lock acquisition, and release were called + verify(service).bootstrapMigrationSchema(); + verify(service).acquireLock("test-instance"); + verify(service).releaseLock("test-instance"); + } + } + + // ----------------------------------------------------------------------- + // Partially Applied + // ----------------------------------------------------------------------- + + @Nested + class PartiallyApplied { + + @Test + void migrate_v1Applied_onlyExecutesPending() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration, v2Migration), List.of(appliedRecord(v1Migration))); + setupQueryRunnable(); + + service.migrate("test-instance"); + + // V1 statement should NOT be executed + verify(neo4jClient, never()).query(contains("test1")); + // V2 statement should be executed + verify(neo4jClient).query(contains("test_name")); + } + + @Test + void migrate_allApplied_noop() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), List.of(appliedRecord(v1Migration))); + + service.migrate("test-instance"); + + // No migration statements should be executed + verifyNoInteractions(neo4jClient); + // recordMigration should NOT be called (only the stubbed setup, no real call) + verify(service, never()).recordMigration(any()); + } + } + + // ----------------------------------------------------------------------- + // Checksum Validation + // ----------------------------------------------------------------------- + + @Nested + class ChecksumValidation { + + @Test + void migrate_checksumMismatch_throwsException() { + SchemaMigrationRecord tampered = SchemaMigrationRecord.builder() + .version(1) + .description("Initial schema") + .checksum("wrong-checksum") + .appliedAt("2025-01-01T00:00:00Z") + .executionTimeMs(100L) + .success(true) + .statementsCount(1) + .build(); + + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), List.of(tampered)); + + assertThatThrownBy(() -> service.migrate("test-instance")) + .isInstanceOf(BusinessException.class) + .satisfies(e -> assertThat(((BusinessException) e).getErrorCodeEnum()) + .isEqualTo(KnowledgeGraphErrorCode.SCHEMA_CHECKSUM_MISMATCH)); + } + + @Test + void migrate_checksumValidationDisabled_skipsCheck() { + properties.getMigration().setValidateChecksums(false); + + SchemaMigrationRecord tampered = SchemaMigrationRecord.builder() + .version(1) + .description("Initial schema") + .checksum("wrong-checksum") + .appliedAt("2025-01-01T00:00:00Z") + .executionTimeMs(100L) + .success(true) + .statementsCount(1) + .build(); + + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), List.of(tampered)); + + // Should NOT throw even with wrong checksum — all applied, no pending + assertThatCode(() -> service.migrate("test-instance")) + .doesNotThrowAnyException(); + } + } + + // ----------------------------------------------------------------------- + // Lock Management + // ----------------------------------------------------------------------- + + @Nested + class LockManagement { + + @Test + void migrate_lockAcquired_executesAndReleases() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), Collections.emptyList()); + setupQueryRunnable(); + + service.migrate("test-instance"); + + var inOrder = inOrder(service); + inOrder.verify(service).acquireLock("test-instance"); + inOrder.verify(service).releaseLock("test-instance"); + } + + @SuppressWarnings("unchecked") + @Test + void migrate_lockHeldByAnother_throwsException() { + SchemaMigrationService service = spy(createService(List.of(v1Migration))); + doNothing().when(service).bootstrapMigrationSchema(); + + // Let acquireLock run for real — mock neo4jClient for lock query + UnboundRunnableSpec lockSpec = mock(UnboundRunnableSpec.class); + RunnableSpec runnableSpec = mock(RunnableSpec.class); + RecordFetchSpec> fetchSpec = mock(RecordFetchSpec.class); + + when(neo4jClient.query(contains("MERGE (lock:_SchemaLock"))).thenReturn(lockSpec); + when(lockSpec.bindAll(anyMap())).thenReturn(runnableSpec); + when(runnableSpec.fetch()).thenReturn(fetchSpec); + when(fetchSpec.first()).thenReturn(Optional.of(Map.of( + "lockedBy", "other-instance", + "canAcquire", false + ))); + + assertThatThrownBy(() -> service.migrate("test-instance")) + .isInstanceOf(BusinessException.class) + .satisfies(e -> assertThat(((BusinessException) e).getErrorCodeEnum()) + .isEqualTo(KnowledgeGraphErrorCode.SCHEMA_MIGRATION_LOCKED)); + } + + @Test + void migrate_lockReleasedOnFailure() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), Collections.emptyList()); + + // Make migration statement fail + UnboundRunnableSpec failSpec = mock(UnboundRunnableSpec.class); + when(neo4jClient.query(anyString())).thenReturn(failSpec); + doThrow(new RuntimeException("Connection refused")) + .when(failSpec).run(); + + assertThatThrownBy(() -> service.migrate("test-instance")) + .isInstanceOf(BusinessException.class); + + // Lock should still be released even after failure + verify(service).releaseLock("test-instance"); + } + } + + // ----------------------------------------------------------------------- + // Migration Failure + // ----------------------------------------------------------------------- + + @Nested + class MigrationFailure { + + @Test + void migrate_statementFails_recordsFailureAndThrows() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), Collections.emptyList()); + + // Make migration statement fail + UnboundRunnableSpec failSpec = mock(UnboundRunnableSpec.class); + when(neo4jClient.query(anyString())).thenReturn(failSpec); + doThrow(new RuntimeException("Connection refused")) + .when(failSpec).run(); + + assertThatThrownBy(() -> service.migrate("test-instance")) + .isInstanceOf(BusinessException.class) + .satisfies(e -> assertThat(((BusinessException) e).getErrorCodeEnum()) + .isEqualTo(KnowledgeGraphErrorCode.SCHEMA_MIGRATION_FAILED)); + + // Failure should be recorded + verify(service).recordMigration(argThat(r -> !r.isSuccess() + && r.getErrorMessage() != null + && r.getErrorMessage().contains("Connection refused"))); + } + + @Test + void migrate_alreadyExistsError_safelySkipped() { + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), Collections.emptyList()); + + // Make migration statement throw "already exists" + UnboundRunnableSpec existsSpec = mock(UnboundRunnableSpec.class); + when(neo4jClient.query(anyString())).thenReturn(existsSpec); + doThrow(new RuntimeException("Constraint already exists")) + .when(existsSpec).run(); + + // Should not throw + assertThatCode(() -> service.migrate("test-instance")) + .doesNotThrowAnyException(); + + // Success should be recorded + verify(service).recordMigration(argThat(r -> r.isSuccess() && r.getVersion() == 1)); + } + } + + // ----------------------------------------------------------------------- + // Retry After Failure (P0) + // ----------------------------------------------------------------------- + + @Nested + class RetryAfterFailure { + + @Test + void recordMigration_usesMerge_allowsRetryAfterFailure() { + SchemaMigrationService service = createService(List.of(v1Migration)); + + UnboundRunnableSpec unboundSpec = mock(UnboundRunnableSpec.class); + RunnableSpec runnableSpec = mock(RunnableSpec.class); + when(neo4jClient.query(contains("MERGE"))).thenReturn(unboundSpec); + when(unboundSpec.bindAll(anyMap())).thenReturn(runnableSpec); + + SchemaMigrationRecord record = SchemaMigrationRecord.builder() + .version(1) + .description("test") + .checksum("abc123") + .appliedAt("2025-01-01T00:00:00Z") + .executionTimeMs(100L) + .success(true) + .statementsCount(1) + .build(); + + service.recordMigration(record); + + // Verify MERGE is used (not CREATE) — ensures retries update + // existing failed records instead of hitting unique constraint violations + verify(neo4jClient).query(contains("MERGE")); + } + + @Test + void migrate_retryAfterFailure_recordsSuccess() { + // Simulate: first run recorded a failure, second run should succeed. + // loadAppliedMigrations only returns success=true, so failed V1 won't be in applied set. + SchemaMigrationService service = createSpiedService( + List.of(v1Migration), Collections.emptyList()); + setupQueryRunnable(); + + service.migrate("test-instance"); + + // Verify success record is written (MERGE will update existing failed record) + verify(service).recordMigration(argThat(r -> r.isSuccess() && r.getVersion() == 1)); + } + } + + // ----------------------------------------------------------------------- + // Database Time for Lock (P1-1) + // ----------------------------------------------------------------------- + + @Nested + class DatabaseTimeLock { + + @SuppressWarnings("unchecked") + @Test + void acquireLock_usesDatabaseTime_notLocalTime() { + SchemaMigrationService service = createService(List.of(v1Migration)); + + UnboundRunnableSpec lockSpec = mock(UnboundRunnableSpec.class); + RunnableSpec runnableSpec = mock(RunnableSpec.class); + RecordFetchSpec> fetchSpec = mock(RecordFetchSpec.class); + + when(neo4jClient.query(contains("MERGE (lock:_SchemaLock"))).thenReturn(lockSpec); + when(lockSpec.bindAll(anyMap())).thenReturn(runnableSpec); + when(runnableSpec.fetch()).thenReturn(fetchSpec); + when(fetchSpec.first()).thenReturn(Optional.of(Map.of( + "lockedBy", "test-instance", + "canAcquire", true + ))); + + service.acquireLock("test-instance"); + + // Verify that local time is NOT passed as parameters — database time is used instead + @SuppressWarnings("rawtypes") + ArgumentCaptor paramsCaptor = ArgumentCaptor.forClass(Map.class); + verify(lockSpec).bindAll(paramsCaptor.capture()); + Map params = paramsCaptor.getValue(); + assertThat(params).containsKey("instanceId"); + assertThat(params).containsKey("timeoutMs"); + assertThat(params).doesNotContainKey("now"); + assertThat(params).doesNotContainKey("expiry"); + } + } + + // ----------------------------------------------------------------------- + // Checksum Computation + // ----------------------------------------------------------------------- + + @Nested + class ChecksumComputation { + + @Test + void computeChecksum_deterministic() { + List statements = List.of("stmt1", "stmt2"); + String checksum1 = SchemaMigrationService.computeChecksum(statements); + String checksum2 = SchemaMigrationService.computeChecksum(statements); + + assertThat(checksum1).isEqualTo(checksum2); + assertThat(checksum1).hasSize(64); // SHA-256 hex length + } + + @Test + void computeChecksum_orderMatters() { + String checksum1 = SchemaMigrationService.computeChecksum(List.of("stmt1", "stmt2")); + String checksum2 = SchemaMigrationService.computeChecksum(List.of("stmt2", "stmt1")); + + assertThat(checksum1).isNotEqualTo(checksum2); + } + } +}