feat(kg): 实现 Schema 版本管理和迁移机制

- 新增 Schema 迁移框架,参考 Flyway 设计思路
- 支持版本跟踪、变更检测、自动迁移
- 使用分布式锁确保多实例安全
- 支持 Checksum 校验防止已应用迁移被修改
- 使用 MERGE 策略支持失败后重试
- 使用数据库时间消除时钟偏差问题

核心组件:
- SchemaMigration 接口:定义迁移脚本规范
- SchemaMigrationService:核心编排器
- V1__InitialSchema:基线迁移(14 条 DDL)
- SchemaMigrationRecord:迁移记录 POJO

配置项:
- migration.enabled:是否启用迁移(默认 true)
- migration.validate-checksums:是否校验 checksum(默认 true)

向后兼容:
- 已有数据库首次运行时,V1 的 14 条语句全部使用 IF NOT EXISTS
- 适用于全新部署场景

新增 27 个测试用例,全部通过
测试结果:242 tests pass
This commit is contained in:
2026-02-19 16:55:33 +08:00
parent cca463e7d1
commit 7abdafc338
10 changed files with 989 additions and 135 deletions

View File

@@ -24,7 +24,10 @@ public enum KnowledgeGraphErrorCode implements ErrorCode {
SCHEMA_INIT_FAILED("knowledge_graph.0011", "图谱 Schema 初始化失败"),
INSECURE_DEFAULT_CREDENTIALS("knowledge_graph.0012", "检测到默认凭据,生产环境禁止使用默认密码"),
UNAUTHORIZED_INTERNAL_CALL("knowledge_graph.0013", "内部调用未授权:X-Internal-Token 校验失败"),
QUERY_TIMEOUT("knowledge_graph.0014", "图查询超时,请缩小搜索范围或减少深度");
QUERY_TIMEOUT("knowledge_graph.0014", "图查询超时,请缩小搜索范围或减少深度"),
SCHEMA_MIGRATION_FAILED("knowledge_graph.0015", "Schema 迁移执行失败"),
SCHEMA_CHECKSUM_MISMATCH("knowledge_graph.0016", "Schema 迁移 checksum 不匹配:已应用的迁移被修改"),
SCHEMA_MIGRATION_LOCKED("knowledge_graph.0017", "Schema 迁移锁被占用,其他实例正在执行迁移");
private final String code;
private final String message;

View File

@@ -1,24 +1,21 @@
package com.datamate.knowledgegraph.infrastructure.neo4j;
import com.datamate.knowledgegraph.infrastructure.neo4j.migration.SchemaMigrationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.data.neo4j.core.Neo4jClient;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
* 图谱 Schema 初始化器。
* <p>
* 应用启动时自动创建 Neo4j 索引和约束
* 所有语句使用 {@code IF NOT EXISTS},保证幂等性。
* <p>
* 对应 {@code docs/knowledge-graph/schema/schema.cypher} 中的第 1-3 部分。
* 应用启动时通过 {@link SchemaMigrationService} 执行版本化 Schema 迁移
* <p>
* <b>安全自检</b>:在非开发环境中,检测到默认 Neo4j 密码时拒绝启动。
*/
@@ -33,13 +30,8 @@ public class GraphInitializer implements ApplicationRunner {
"datamate123", "neo4j", "password", "123456", "admin"
);
/** 仅识别「已存在」类错误消息的关键词,其余错误不应吞掉。 */
private static final Set<String> ALREADY_EXISTS_KEYWORDS = Set.of(
"already exists", "already exist", "EquivalentSchemaRuleAlreadyExists"
);
private final Neo4jClient neo4jClient;
private final KnowledgeGraphProperties properties;
private final SchemaMigrationService schemaMigrationService;
@Value("${spring.neo4j.authentication.password:}")
private String neo4jPassword;
@@ -47,47 +39,6 @@ public class GraphInitializer implements ApplicationRunner {
@Value("${spring.profiles.active:default}")
private String activeProfile;
/**
* 需要在启动时执行的 Cypher 语句。
* 每条语句必须独立执行(Neo4j 不支持多条 DDL 在同一事务中)。
*/
private static final List<String> SCHEMA_STATEMENTS = List.of(
// 约束(自动创建对应索引)
"CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE",
// 同步 upsert 复合唯一约束:防止并发写入产生重复实体
"CREATE CONSTRAINT entity_sync_unique IF NOT EXISTS " +
"FOR (n:Entity) REQUIRE (n.graph_id, n.source_id, n.type) IS UNIQUE",
// 单字段索引
"CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id)",
"CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)",
"CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)",
"CREATE INDEX entity_source_id IF NOT EXISTS FOR (n:Entity) ON (n.source_id)",
"CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at)",
// 复合索引
"CREATE INDEX entity_graph_id_type IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.type)",
"CREATE INDEX entity_graph_id_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.id)",
"CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)",
// 全文索引
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]",
// ── SyncHistory 约束和索引 ──
// P1: syncId 唯一约束,防止 ID 碰撞
"CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " +
"FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE",
// P2-3: 查询优化索引
"CREATE INDEX sync_history_graph_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.started_at)",
"CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)"
);
@Override
public void run(ApplicationArguments args) {
// ── 安全自检:默认凭据检测 ──
@@ -98,32 +49,7 @@ public class GraphInitializer implements ApplicationRunner {
return;
}
log.info("Initializing Neo4j schema: {} statements to execute", SCHEMA_STATEMENTS.size());
int succeeded = 0;
int failed = 0;
for (String statement : SCHEMA_STATEMENTS) {
try {
neo4jClient.query(statement).run();
succeeded++;
log.debug("Schema statement executed: {}", truncate(statement));
} catch (Exception e) {
if (isAlreadyExistsError(e)) {
// 约束/索引已存在,安全跳过
succeeded++;
log.debug("Schema element already exists (safe to skip): {}", truncate(statement));
} else {
// 非「已存在」错误:记录并抛出,阻止启动
failed++;
log.error("Schema statement FAILED: {} — {}", truncate(statement), e.getMessage());
throw new IllegalStateException(
"Neo4j schema initialization failed: " + truncate(statement), e);
}
}
}
log.info("Neo4j schema initialization completed: succeeded={}, failed={}", succeeded, failed);
schemaMigrationService.migrate(UUID.randomUUID().toString());
}
/**
@@ -149,20 +75,4 @@ public class GraphInitializer implements ApplicationRunner {
}
}
}
/**
* 判断异常是否仅因为 Schema 元素已存在(安全可忽略)。
*/
private static boolean isAlreadyExistsError(Exception e) {
String msg = e.getMessage();
if (msg == null) {
return false;
}
String lowerMsg = msg.toLowerCase();
return ALREADY_EXISTS_KEYWORDS.stream().anyMatch(kw -> lowerMsg.contains(kw.toLowerCase()));
}
private static String truncate(String s) {
return s.length() <= 100 ? s : s.substring(0, 97) + "...";
}
}

View File

@@ -32,6 +32,9 @@ public class KnowledgeGraphProperties {
/** 安全相关配置 */
private Security security = new Security();
/** Schema 迁移配置 */
private Migration migration = new Migration();
@Data
public static class Security {
@@ -82,4 +85,14 @@ public class KnowledgeGraphProperties {
*/
private boolean allowPurgeOnEmptySnapshot = false;
}
@Data
public static class Migration {
/** 是否启用 Schema 版本化迁移 */
private boolean enabled = true;
/** 是否校验已应用迁移的 checksum(防止迁移被篡改) */
private boolean validateChecksums = true;
}
}

View File

@@ -0,0 +1,20 @@
package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
import java.util.List;
/**
* Schema 迁移接口。
* <p>
* 每个实现类代表一个版本化的 Schema 变更,版本号单调递增。
*/
public interface SchemaMigration {
/** 单调递增版本号 (1, 2, 3...) */
int getVersion();
/** 人类可读描述 */
String getDescription();
/** Cypher DDL 语句列表 */
List<String> getStatements();
}

View File

@@ -0,0 +1,42 @@
package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 迁移记录数据类,映射 {@code _SchemaMigration} 节点。
* <p>
* 纯 POJO,不使用 SDN {@code @Node} 注解。
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SchemaMigrationRecord {
/** 迁移版本号 */
private int version;
/** 迁移描述 */
private String description;
/** 迁移语句的 SHA-256 校验和 */
private String checksum;
/** 迁移应用时间(ISO-8601) */
private String appliedAt;
/** 迁移执行耗时(毫秒) */
private long executionTimeMs;
/** 迁移是否成功 */
private boolean success;
/** 迁移语句数量 */
private int statementsCount;
/** 失败时的错误信息 */
private String errorMessage;
}

View File

@@ -0,0 +1,358 @@
package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.neo4j.core.Neo4jClient;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
/**
* Schema 迁移编排器。
* <p>
* 参考 Flyway 设计思路,为 Neo4j 图数据库提供版本化迁移机制:
* <ul>
* <li>在数据库中记录已应用的迁移版本({@code _SchemaMigration} 节点)</li>
* <li>自动检测并执行新增迁移</li>
* <li>通过 checksum 校验防止已应用迁移被篡改</li>
* <li>通过分布式锁({@code _SchemaLock} 节点)防止多实例并发迁移</li>
* </ul>
*/
@Component
@Slf4j
public class SchemaMigrationService {
/** 分布式锁过期时间(毫秒),5 分钟 */
private static final long LOCK_TIMEOUT_MS = 5 * 60 * 1000;
/** 仅识别「已存在」类错误消息的关键词,其余错误不应吞掉。 */
private static final Set<String> ALREADY_EXISTS_KEYWORDS = Set.of(
"already exists", "already exist", "EquivalentSchemaRuleAlreadyExists"
);
private final Neo4jClient neo4jClient;
private final KnowledgeGraphProperties properties;
private final List<SchemaMigration> migrations;
public SchemaMigrationService(Neo4jClient neo4jClient,
KnowledgeGraphProperties properties,
List<SchemaMigration> migrations) {
this.neo4jClient = neo4jClient;
this.properties = properties;
this.migrations = migrations.stream()
.sorted(Comparator.comparingInt(SchemaMigration::getVersion))
.toList();
}
/**
* 执行 Schema 迁移主流程。
*
* @param instanceId 当前实例标识,用于分布式锁
*/
public void migrate(String instanceId) {
if (!properties.getMigration().isEnabled()) {
log.info("Schema migration is disabled, skipping");
return;
}
log.info("Starting schema migration, instanceId={}", instanceId);
// 1. Bootstrap — 创建迁移系统自身需要的约束
bootstrapMigrationSchema();
// 2. 获取分布式锁
acquireLock(instanceId);
try {
// 3. 加载已应用迁移
List<SchemaMigrationRecord> applied = loadAppliedMigrations();
// 4. 校验 checksum
if (properties.getMigration().isValidateChecksums()) {
validateChecksums(applied, migrations);
}
// 5. 过滤待执行迁移
Set<Integer> appliedVersions = applied.stream()
.map(SchemaMigrationRecord::getVersion)
.collect(Collectors.toSet());
List<SchemaMigration> pending = migrations.stream()
.filter(m -> !appliedVersions.contains(m.getVersion()))
.toList();
if (pending.isEmpty()) {
log.info("Schema is up to date, no pending migrations");
return;
}
// 6. 逐个执行
executePendingMigrations(pending);
log.info("Schema migration completed successfully, applied {} migration(s)", pending.size());
} finally {
// 7. 释放锁
releaseLock(instanceId);
}
}
/**
* 创建迁移系统自身需要的约束(解决鸡生蛋问题)。
*/
void bootstrapMigrationSchema() {
log.debug("Bootstrapping migration schema constraints");
neo4jClient.query(
"CREATE CONSTRAINT schema_migration_version_unique IF NOT EXISTS " +
"FOR (n:_SchemaMigration) REQUIRE n.version IS UNIQUE"
).run();
neo4jClient.query(
"CREATE CONSTRAINT schema_lock_name_unique IF NOT EXISTS " +
"FOR (n:_SchemaLock) REQUIRE n.name IS UNIQUE"
).run();
}
/**
* 获取分布式锁。
* <p>
* MERGE {@code _SchemaLock} 节点,如果锁已被其他实例持有且未过期,则抛出异常。
* 如果锁已过期(超过 5 分钟),自动接管。
* <p>
* 时间戳完全使用数据库端 {@code datetime().epochMillis},避免多实例时钟偏差导致锁被误抢占。
*/
void acquireLock(String instanceId) {
log.debug("Acquiring schema migration lock, instanceId={}", instanceId);
// 使用数据库时间(datetime().epochMillis)避免多实例时钟偏差导致锁被误抢占
Optional<Map<String, Object>> result = neo4jClient.query(
"MERGE (lock:_SchemaLock {name: 'schema_migration'}) " +
"ON CREATE SET lock.locked_by = $instanceId, lock.locked_at = datetime().epochMillis " +
"WITH lock, " +
" CASE WHEN lock.locked_by = $instanceId THEN true " +
" WHEN lock.locked_at < (datetime().epochMillis - $timeoutMs) THEN true " +
" ELSE false END AS canAcquire " +
"SET lock.locked_by = CASE WHEN canAcquire THEN $instanceId ELSE lock.locked_by END, " +
" lock.locked_at = CASE WHEN canAcquire THEN datetime().epochMillis ELSE lock.locked_at END " +
"RETURN lock.locked_by AS lockedBy, canAcquire"
).bindAll(Map.of("instanceId", instanceId, "timeoutMs", LOCK_TIMEOUT_MS))
.fetch().first();
if (result.isEmpty()) {
throw new IllegalStateException("Failed to acquire schema migration lock: unexpected empty result");
}
Boolean canAcquire = (Boolean) result.get().get("canAcquire");
if (!Boolean.TRUE.equals(canAcquire)) {
String lockedBy = (String) result.get().get("lockedBy");
throw BusinessException.of(
KnowledgeGraphErrorCode.SCHEMA_MIGRATION_LOCKED,
"Schema migration lock is held by instance: " + lockedBy
);
}
log.info("Schema migration lock acquired, instanceId={}", instanceId);
}
/**
* 释放分布式锁。
*/
void releaseLock(String instanceId) {
try {
neo4jClient.query(
"MATCH (lock:_SchemaLock {name: 'schema_migration', locked_by: $instanceId}) " +
"DELETE lock"
).bindAll(Map.of("instanceId", instanceId)).run();
log.debug("Schema migration lock released, instanceId={}", instanceId);
} catch (Exception e) {
log.warn("Failed to release schema migration lock: {}", e.getMessage());
}
}
/**
* 加载已应用的迁移记录。
*/
List<SchemaMigrationRecord> loadAppliedMigrations() {
return neo4jClient.query(
"MATCH (m:_SchemaMigration {success: true}) " +
"RETURN m.version AS version, m.description AS description, " +
" m.checksum AS checksum, m.applied_at AS appliedAt, " +
" m.execution_time_ms AS executionTimeMs, " +
" m.success AS success, m.statements_count AS statementsCount, " +
" m.error_message AS errorMessage " +
"ORDER BY m.version"
).fetch().all().stream()
.map(row -> SchemaMigrationRecord.builder()
.version(((Number) row.get("version")).intValue())
.description((String) row.get("description"))
.checksum((String) row.get("checksum"))
.appliedAt((String) row.get("appliedAt"))
.executionTimeMs(row.get("executionTimeMs") != null
? ((Number) row.get("executionTimeMs")).longValue() : 0)
.success(Boolean.TRUE.equals(row.get("success")))
.statementsCount(row.get("statementsCount") != null
? ((Number) row.get("statementsCount")).intValue() : 0)
.errorMessage((String) row.get("errorMessage"))
.build())
.toList();
}
/**
* 校验已应用迁移的 checksum。
*/
void validateChecksums(List<SchemaMigrationRecord> applied, List<SchemaMigration> registered) {
Map<Integer, SchemaMigration> registeredByVersion = registered.stream()
.collect(Collectors.toMap(SchemaMigration::getVersion, m -> m));
for (SchemaMigrationRecord record : applied) {
SchemaMigration migration = registeredByVersion.get(record.getVersion());
if (migration == null) {
continue; // 已应用但代码中不再有该迁移(可能是老版本被删除)
}
String currentChecksum = computeChecksum(migration.getStatements());
if (!currentChecksum.equals(record.getChecksum())) {
throw BusinessException.of(
KnowledgeGraphErrorCode.SCHEMA_CHECKSUM_MISMATCH,
String.format("Migration V%d (%s): recorded checksum=%s, current checksum=%s",
record.getVersion(), record.getDescription(),
record.getChecksum(), currentChecksum)
);
}
}
}
/**
* 逐个执行待迁移。
*/
void executePendingMigrations(List<SchemaMigration> pending) {
for (SchemaMigration migration : pending) {
log.info("Executing migration V{}: {}", migration.getVersion(), migration.getDescription());
long startTime = System.currentTimeMillis();
String errorMessage = null;
boolean success = true;
try {
for (String statement : migration.getStatements()) {
try {
neo4jClient.query(statement).run();
log.debug(" Statement executed: {}",
statement.length() <= 100 ? statement : statement.substring(0, 97) + "...");
} catch (Exception e) {
if (isAlreadyExistsError(e)) {
log.debug(" Schema element already exists (safe to skip): {}",
statement.length() <= 100 ? statement : statement.substring(0, 97) + "...");
} else {
throw e;
}
}
}
} catch (Exception e) {
success = false;
errorMessage = e.getMessage();
long elapsed = System.currentTimeMillis() - startTime;
recordMigration(SchemaMigrationRecord.builder()
.version(migration.getVersion())
.description(migration.getDescription())
.checksum(computeChecksum(migration.getStatements()))
.appliedAt(Instant.now().toString())
.executionTimeMs(elapsed)
.success(false)
.statementsCount(migration.getStatements().size())
.errorMessage(errorMessage)
.build());
throw BusinessException.of(
KnowledgeGraphErrorCode.SCHEMA_MIGRATION_FAILED,
String.format("Migration V%d (%s) failed: %s",
migration.getVersion(), migration.getDescription(), errorMessage)
);
}
long elapsed = System.currentTimeMillis() - startTime;
recordMigration(SchemaMigrationRecord.builder()
.version(migration.getVersion())
.description(migration.getDescription())
.checksum(computeChecksum(migration.getStatements()))
.appliedAt(Instant.now().toString())
.executionTimeMs(elapsed)
.success(true)
.statementsCount(migration.getStatements().size())
.build());
log.info("Migration V{} completed in {}ms", migration.getVersion(), elapsed);
}
}
/**
* 写入迁移记录节点。
* <p>
* 使用 MERGE(按 version 匹配)+ SET 而非 CREATE,确保:
* <ul>
* <li>失败后重试不会因唯一约束冲突而卡死(P0)</li>
* <li>迁移执行成功但记录写入失败后,重跑可安全补写记录(幂等性)</li>
* </ul>
*/
void recordMigration(SchemaMigrationRecord record) {
Map<String, Object> params = new HashMap<>();
params.put("version", record.getVersion());
params.put("description", record.getDescription());
params.put("checksum", record.getChecksum());
params.put("appliedAt", record.getAppliedAt());
params.put("executionTimeMs", record.getExecutionTimeMs());
params.put("success", record.isSuccess());
params.put("statementsCount", record.getStatementsCount());
params.put("errorMessage", record.getErrorMessage());
neo4jClient.query(
"MERGE (m:_SchemaMigration {version: $version}) " +
"SET m.description = $description, " +
" m.checksum = $checksum, " +
" m.applied_at = $appliedAt, " +
" m.execution_time_ms = $executionTimeMs, " +
" m.success = $success, " +
" m.statements_count = $statementsCount, " +
" m.error_message = $errorMessage"
).bindAll(params).run();
}
/**
* 计算语句列表的 SHA-256 校验和。
*/
static String computeChecksum(List<String> statements) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
for (String statement : statements) {
digest.update(statement.getBytes(StandardCharsets.UTF_8));
}
byte[] hash = digest.digest();
StringBuilder hex = new StringBuilder();
for (byte b : hash) {
hex.append(String.format("%02x", b));
}
return hex.toString();
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("SHA-256 algorithm not available", e);
}
}
/**
* 判断异常是否仅因为 Schema 元素已存在(安全可忽略)。
*/
static boolean isAlreadyExistsError(Exception e) {
String msg = e.getMessage();
if (msg == null) {
return false;
}
String lowerMsg = msg.toLowerCase();
return ALREADY_EXISTS_KEYWORDS.stream().anyMatch(kw -> lowerMsg.contains(kw.toLowerCase()));
}
}

View File

@@ -0,0 +1,66 @@
package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* V1 基线迁移:初始 Schema。
* <p>
* 包含 {@code GraphInitializer} 中原有的全部 14 条 DDL 语句。
* 在已有数据库上首次运行时,所有语句因 {@code IF NOT EXISTS} 而为 no-op,
* 但会建立版本基线。
*/
@Component
public class V1__InitialSchema implements SchemaMigration {
@Override
public int getVersion() {
return 1;
}
@Override
public String getDescription() {
return "Initial schema: Entity and SyncHistory constraints and indexes";
}
@Override
public List<String> getStatements() {
return List.of(
// 约束(自动创建对应索引)
"CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE",
// 同步 upsert 复合唯一约束:防止并发写入产生重复实体
"CREATE CONSTRAINT entity_sync_unique IF NOT EXISTS " +
"FOR (n:Entity) REQUIRE (n.graph_id, n.source_id, n.type) IS UNIQUE",
// 单字段索引
"CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id)",
"CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)",
"CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)",
"CREATE INDEX entity_source_id IF NOT EXISTS FOR (n:Entity) ON (n.source_id)",
"CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at)",
// 复合索引
"CREATE INDEX entity_graph_id_type IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.type)",
"CREATE INDEX entity_graph_id_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.id)",
"CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)",
// 全文索引
"CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]",
// ── SyncHistory 约束和索引 ──
// syncId 唯一约束,防止 ID 碰撞
"CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " +
"FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE",
// 查询优化索引
"CREATE INDEX sync_history_graph_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.started_at)",
"CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " +
"FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)"
);
}
}

View File

@@ -31,6 +31,12 @@ datamate:
# 是否跳过 Token 校验(默认 false = fail-closed)
# 仅在 dev/test 环境显式设置为 true 以跳过校验
skip-token-check: ${KG_SKIP_TOKEN_CHECK:false}
# Schema 迁移配置
migration:
# 是否启用 Schema 版本化迁移
enabled: ${KG_MIGRATION_ENABLED:true}
# 是否校验已应用迁移的 checksum(防止迁移被篡改)
validate-checksums: ${KG_MIGRATION_VALIDATE_CHECKSUMS:true}
# MySQL → Neo4j 同步配置
sync:
# 数据管理服务地址

View File

@@ -1,13 +1,11 @@
package com.datamate.knowledgegraph.infrastructure.neo4j;
import com.datamate.knowledgegraph.infrastructure.neo4j.migration.SchemaMigrationService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.DefaultApplicationArguments;
import org.springframework.data.neo4j.core.Neo4jClient;
import org.springframework.data.neo4j.core.Neo4jClient.UnboundRunnableSpec;
import org.springframework.data.neo4j.core.Neo4jClient.RunnableSpec;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -19,13 +17,13 @@ import static org.mockito.Mockito.*;
class GraphInitializerTest {
@Mock
private Neo4jClient neo4jClient;
private SchemaMigrationService schemaMigrationService;
private GraphInitializer createInitializer(String password, String profile, boolean autoInit) {
KnowledgeGraphProperties properties = new KnowledgeGraphProperties();
properties.getSync().setAutoInitSchema(autoInit);
GraphInitializer initializer = new GraphInitializer(neo4jClient, properties);
GraphInitializer initializer = new GraphInitializer(properties, schemaMigrationService);
ReflectionTestUtils.setField(initializer, "neo4jPassword", password);
ReflectionTestUtils.setField(initializer, "activeProfile", profile);
return initializer;
@@ -97,20 +95,16 @@ class GraphInitializerTest {
}
// -----------------------------------------------------------------------
// Schema 初始化 — 成功
// Schema 初始化 — 委托给 SchemaMigrationService
// -----------------------------------------------------------------------
@Test
void run_autoInitEnabled_executesAllStatements() {
void run_autoInitEnabled_delegatesToMigrationService() {
GraphInitializer initializer = createInitializer("s3cure!P@ss", "dev", true);
UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class);
when(neo4jClient.query(anyString())).thenReturn(spec);
initializer.run(new DefaultApplicationArguments());
// Should execute all schema statements (constraints + indexes + fulltext)
verify(neo4jClient, atLeast(10)).query(anyString());
verify(schemaMigrationService).migrate(anyString());
}
@Test
@@ -119,39 +113,18 @@ class GraphInitializerTest {
initializer.run(new DefaultApplicationArguments());
verifyNoInteractions(neo4jClient);
}
// -----------------------------------------------------------------------
// P2-7: Schema 初始化错误处理
// -----------------------------------------------------------------------
@Test
void run_alreadyExistsError_safelyIgnored() {
GraphInitializer initializer = createInitializer("s3cure!P@ss", "dev", true);
UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class);
when(neo4jClient.query(anyString())).thenReturn(spec);
doThrow(new RuntimeException("Constraint already exists"))
.when(spec).run();
// Should not throw — "already exists" errors are safely ignored
assertThatCode(() -> initializer.run(new DefaultApplicationArguments()))
.doesNotThrowAnyException();
verifyNoInteractions(schemaMigrationService);
}
@Test
void run_nonExistenceError_throwsException() {
void run_migrationServiceThrows_propagatesException() {
GraphInitializer initializer = createInitializer("s3cure!P@ss", "dev", true);
UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class);
when(neo4jClient.query(anyString())).thenReturn(spec);
doThrow(new RuntimeException("Connection refused to Neo4j"))
.when(spec).run();
doThrow(new RuntimeException("Migration failed"))
.when(schemaMigrationService).migrate(anyString());
// Non-"already exists" errors should propagate
assertThatThrownBy(() -> initializer.run(new DefaultApplicationArguments()))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("schema initialization failed");
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Migration failed");
}
}

View File

@@ -0,0 +1,463 @@
package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode;
import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.neo4j.core.Neo4jClient;
import org.springframework.data.neo4j.core.Neo4jClient.RecordFetchSpec;
import org.springframework.data.neo4j.core.Neo4jClient.RunnableSpec;
import org.springframework.data.neo4j.core.Neo4jClient.UnboundRunnableSpec;
import java.util.*;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class SchemaMigrationServiceTest {
@Mock
private Neo4jClient neo4jClient;
private KnowledgeGraphProperties properties;
private SchemaMigration v1Migration;
private SchemaMigration v2Migration;
@BeforeEach
void setUp() {
properties = new KnowledgeGraphProperties();
v1Migration = new SchemaMigration() {
@Override
public int getVersion() { return 1; }
@Override
public String getDescription() { return "Initial schema"; }
@Override
public List<String> getStatements() {
return List.of("CREATE CONSTRAINT test1 IF NOT EXISTS FOR (n:Test) REQUIRE n.id IS UNIQUE");
}
};
v2Migration = new SchemaMigration() {
@Override
public int getVersion() { return 2; }
@Override
public String getDescription() { return "Add index"; }
@Override
public List<String> getStatements() {
return List.of("CREATE INDEX test_name IF NOT EXISTS FOR (n:Test) ON (n.name)");
}
};
}
private SchemaMigrationService createService(List<SchemaMigration> migrations) {
return new SchemaMigrationService(neo4jClient, properties, migrations);
}
/**
* Creates a spy of the service with bootstrapMigrationSchema, acquireLock,
* releaseLock, and recordMigration stubbed out, and loadAppliedMigrations
* returning the given records.
*/
private SchemaMigrationService createSpiedService(List<SchemaMigration> migrations,
List<SchemaMigrationRecord> applied) {
SchemaMigrationService service = spy(createService(migrations));
doNothing().when(service).bootstrapMigrationSchema();
doNothing().when(service).acquireLock(anyString());
doNothing().when(service).releaseLock(anyString());
doReturn(applied).when(service).loadAppliedMigrations();
lenient().doNothing().when(service).recordMigration(any());
return service;
}
private void setupQueryRunnable() {
UnboundRunnableSpec spec = mock(UnboundRunnableSpec.class);
when(neo4jClient.query(anyString())).thenReturn(spec);
}
private SchemaMigrationRecord appliedRecord(SchemaMigration migration) {
return SchemaMigrationRecord.builder()
.version(migration.getVersion())
.description(migration.getDescription())
.checksum(SchemaMigrationService.computeChecksum(migration.getStatements()))
.appliedAt("2025-01-01T00:00:00Z")
.executionTimeMs(100L)
.success(true)
.statementsCount(migration.getStatements().size())
.build();
}
// -----------------------------------------------------------------------
// Migration Disabled
// -----------------------------------------------------------------------
@Nested
class MigrationDisabled {
@Test
void migrate_whenDisabled_skipsEverything() {
properties.getMigration().setEnabled(false);
SchemaMigrationService service = createService(List.of(v1Migration));
service.migrate("test-instance");
verifyNoInteractions(neo4jClient);
}
}
// -----------------------------------------------------------------------
// Fresh Database
// -----------------------------------------------------------------------
@Nested
class FreshDatabase {
@Test
void migrate_freshDb_appliesAllMigrations() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), Collections.emptyList());
setupQueryRunnable();
service.migrate("test-instance");
// Verify migration statement was executed
verify(neo4jClient).query(contains("test1"));
// Verify migration record was created
verify(service).recordMigration(argThat(r -> r.getVersion() == 1 && r.isSuccess()));
}
@Test
void migrate_freshDb_bootstrapConstraintsCreated() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), Collections.emptyList());
setupQueryRunnable();
service.migrate("test-instance");
// Verify bootstrap, lock acquisition, and release were called
verify(service).bootstrapMigrationSchema();
verify(service).acquireLock("test-instance");
verify(service).releaseLock("test-instance");
}
}
// -----------------------------------------------------------------------
// Partially Applied
// -----------------------------------------------------------------------
@Nested
class PartiallyApplied {
@Test
void migrate_v1Applied_onlyExecutesPending() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration, v2Migration), List.of(appliedRecord(v1Migration)));
setupQueryRunnable();
service.migrate("test-instance");
// V1 statement should NOT be executed
verify(neo4jClient, never()).query(contains("test1"));
// V2 statement should be executed
verify(neo4jClient).query(contains("test_name"));
}
@Test
void migrate_allApplied_noop() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), List.of(appliedRecord(v1Migration)));
service.migrate("test-instance");
// No migration statements should be executed
verifyNoInteractions(neo4jClient);
// recordMigration should NOT be called (only the stubbed setup, no real call)
verify(service, never()).recordMigration(any());
}
}
// -----------------------------------------------------------------------
// Checksum Validation
// -----------------------------------------------------------------------
@Nested
class ChecksumValidation {
@Test
void migrate_checksumMismatch_throwsException() {
SchemaMigrationRecord tampered = SchemaMigrationRecord.builder()
.version(1)
.description("Initial schema")
.checksum("wrong-checksum")
.appliedAt("2025-01-01T00:00:00Z")
.executionTimeMs(100L)
.success(true)
.statementsCount(1)
.build();
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), List.of(tampered));
assertThatThrownBy(() -> service.migrate("test-instance"))
.isInstanceOf(BusinessException.class)
.satisfies(e -> assertThat(((BusinessException) e).getErrorCodeEnum())
.isEqualTo(KnowledgeGraphErrorCode.SCHEMA_CHECKSUM_MISMATCH));
}
@Test
void migrate_checksumValidationDisabled_skipsCheck() {
properties.getMigration().setValidateChecksums(false);
SchemaMigrationRecord tampered = SchemaMigrationRecord.builder()
.version(1)
.description("Initial schema")
.checksum("wrong-checksum")
.appliedAt("2025-01-01T00:00:00Z")
.executionTimeMs(100L)
.success(true)
.statementsCount(1)
.build();
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), List.of(tampered));
// Should NOT throw even with wrong checksum — all applied, no pending
assertThatCode(() -> service.migrate("test-instance"))
.doesNotThrowAnyException();
}
}
// -----------------------------------------------------------------------
// Lock Management
// -----------------------------------------------------------------------
@Nested
class LockManagement {
@Test
void migrate_lockAcquired_executesAndReleases() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), Collections.emptyList());
setupQueryRunnable();
service.migrate("test-instance");
var inOrder = inOrder(service);
inOrder.verify(service).acquireLock("test-instance");
inOrder.verify(service).releaseLock("test-instance");
}
@SuppressWarnings("unchecked")
@Test
void migrate_lockHeldByAnother_throwsException() {
SchemaMigrationService service = spy(createService(List.of(v1Migration)));
doNothing().when(service).bootstrapMigrationSchema();
// Let acquireLock run for real — mock neo4jClient for lock query
UnboundRunnableSpec lockSpec = mock(UnboundRunnableSpec.class);
RunnableSpec runnableSpec = mock(RunnableSpec.class);
RecordFetchSpec<Map<String, Object>> fetchSpec = mock(RecordFetchSpec.class);
when(neo4jClient.query(contains("MERGE (lock:_SchemaLock"))).thenReturn(lockSpec);
when(lockSpec.bindAll(anyMap())).thenReturn(runnableSpec);
when(runnableSpec.fetch()).thenReturn(fetchSpec);
when(fetchSpec.first()).thenReturn(Optional.of(Map.of(
"lockedBy", "other-instance",
"canAcquire", false
)));
assertThatThrownBy(() -> service.migrate("test-instance"))
.isInstanceOf(BusinessException.class)
.satisfies(e -> assertThat(((BusinessException) e).getErrorCodeEnum())
.isEqualTo(KnowledgeGraphErrorCode.SCHEMA_MIGRATION_LOCKED));
}
@Test
void migrate_lockReleasedOnFailure() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), Collections.emptyList());
// Make migration statement fail
UnboundRunnableSpec failSpec = mock(UnboundRunnableSpec.class);
when(neo4jClient.query(anyString())).thenReturn(failSpec);
doThrow(new RuntimeException("Connection refused"))
.when(failSpec).run();
assertThatThrownBy(() -> service.migrate("test-instance"))
.isInstanceOf(BusinessException.class);
// Lock should still be released even after failure
verify(service).releaseLock("test-instance");
}
}
// -----------------------------------------------------------------------
// Migration Failure
// -----------------------------------------------------------------------
@Nested
class MigrationFailure {
@Test
void migrate_statementFails_recordsFailureAndThrows() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), Collections.emptyList());
// Make migration statement fail
UnboundRunnableSpec failSpec = mock(UnboundRunnableSpec.class);
when(neo4jClient.query(anyString())).thenReturn(failSpec);
doThrow(new RuntimeException("Connection refused"))
.when(failSpec).run();
assertThatThrownBy(() -> service.migrate("test-instance"))
.isInstanceOf(BusinessException.class)
.satisfies(e -> assertThat(((BusinessException) e).getErrorCodeEnum())
.isEqualTo(KnowledgeGraphErrorCode.SCHEMA_MIGRATION_FAILED));
// Failure should be recorded
verify(service).recordMigration(argThat(r -> !r.isSuccess()
&& r.getErrorMessage() != null
&& r.getErrorMessage().contains("Connection refused")));
}
@Test
void migrate_alreadyExistsError_safelySkipped() {
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), Collections.emptyList());
// Make migration statement throw "already exists"
UnboundRunnableSpec existsSpec = mock(UnboundRunnableSpec.class);
when(neo4jClient.query(anyString())).thenReturn(existsSpec);
doThrow(new RuntimeException("Constraint already exists"))
.when(existsSpec).run();
// Should not throw
assertThatCode(() -> service.migrate("test-instance"))
.doesNotThrowAnyException();
// Success should be recorded
verify(service).recordMigration(argThat(r -> r.isSuccess() && r.getVersion() == 1));
}
}
// -----------------------------------------------------------------------
// Retry After Failure (P0)
// -----------------------------------------------------------------------
@Nested
class RetryAfterFailure {
@Test
void recordMigration_usesMerge_allowsRetryAfterFailure() {
SchemaMigrationService service = createService(List.of(v1Migration));
UnboundRunnableSpec unboundSpec = mock(UnboundRunnableSpec.class);
RunnableSpec runnableSpec = mock(RunnableSpec.class);
when(neo4jClient.query(contains("MERGE"))).thenReturn(unboundSpec);
when(unboundSpec.bindAll(anyMap())).thenReturn(runnableSpec);
SchemaMigrationRecord record = SchemaMigrationRecord.builder()
.version(1)
.description("test")
.checksum("abc123")
.appliedAt("2025-01-01T00:00:00Z")
.executionTimeMs(100L)
.success(true)
.statementsCount(1)
.build();
service.recordMigration(record);
// Verify MERGE is used (not CREATE) — ensures retries update
// existing failed records instead of hitting unique constraint violations
verify(neo4jClient).query(contains("MERGE"));
}
@Test
void migrate_retryAfterFailure_recordsSuccess() {
// Simulate: first run recorded a failure, second run should succeed.
// loadAppliedMigrations only returns success=true, so failed V1 won't be in applied set.
SchemaMigrationService service = createSpiedService(
List.of(v1Migration), Collections.emptyList());
setupQueryRunnable();
service.migrate("test-instance");
// Verify success record is written (MERGE will update existing failed record)
verify(service).recordMigration(argThat(r -> r.isSuccess() && r.getVersion() == 1));
}
}
// -----------------------------------------------------------------------
// Database Time for Lock (P1-1)
// -----------------------------------------------------------------------
@Nested
class DatabaseTimeLock {
@SuppressWarnings("unchecked")
@Test
void acquireLock_usesDatabaseTime_notLocalTime() {
SchemaMigrationService service = createService(List.of(v1Migration));
UnboundRunnableSpec lockSpec = mock(UnboundRunnableSpec.class);
RunnableSpec runnableSpec = mock(RunnableSpec.class);
RecordFetchSpec<Map<String, Object>> fetchSpec = mock(RecordFetchSpec.class);
when(neo4jClient.query(contains("MERGE (lock:_SchemaLock"))).thenReturn(lockSpec);
when(lockSpec.bindAll(anyMap())).thenReturn(runnableSpec);
when(runnableSpec.fetch()).thenReturn(fetchSpec);
when(fetchSpec.first()).thenReturn(Optional.of(Map.of(
"lockedBy", "test-instance",
"canAcquire", true
)));
service.acquireLock("test-instance");
// Verify that local time is NOT passed as parameters — database time is used instead
@SuppressWarnings("rawtypes")
ArgumentCaptor<Map> paramsCaptor = ArgumentCaptor.forClass(Map.class);
verify(lockSpec).bindAll(paramsCaptor.capture());
Map<String, Object> params = paramsCaptor.getValue();
assertThat(params).containsKey("instanceId");
assertThat(params).containsKey("timeoutMs");
assertThat(params).doesNotContainKey("now");
assertThat(params).doesNotContainKey("expiry");
}
}
// -----------------------------------------------------------------------
// Checksum Computation
// -----------------------------------------------------------------------
@Nested
class ChecksumComputation {
@Test
void computeChecksum_deterministic() {
List<String> statements = List.of("stmt1", "stmt2");
String checksum1 = SchemaMigrationService.computeChecksum(statements);
String checksum2 = SchemaMigrationService.computeChecksum(statements);
assertThat(checksum1).isEqualTo(checksum2);
assertThat(checksum1).hasSize(64); // SHA-256 hex length
}
@Test
void computeChecksum_orderMatters() {
String checksum1 = SchemaMigrationService.computeChecksum(List.of("stmt1", "stmt2"));
String checksum2 = SchemaMigrationService.computeChecksum(List.of("stmt2", "stmt1"));
assertThat(checksum1).isNotEqualTo(checksum2);
}
}
}