diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java
index 5df4dd4..9d40785 100644
--- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java
+++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/exception/KnowledgeGraphErrorCode.java
@@ -24,7 +24,10 @@ public enum KnowledgeGraphErrorCode implements ErrorCode {
SCHEMA_INIT_FAILED("knowledge_graph.0011", "图谱 Schema 初始化失败"),
INSECURE_DEFAULT_CREDENTIALS("knowledge_graph.0012", "检测到默认凭据,生产环境禁止使用默认密码"),
UNAUTHORIZED_INTERNAL_CALL("knowledge_graph.0013", "内部调用未授权:X-Internal-Token 校验失败"),
- QUERY_TIMEOUT("knowledge_graph.0014", "图查询超时,请缩小搜索范围或减少深度");
+ QUERY_TIMEOUT("knowledge_graph.0014", "图查询超时,请缩小搜索范围或减少深度"),
+ SCHEMA_MIGRATION_FAILED("knowledge_graph.0015", "Schema 迁移执行失败"),
+ SCHEMA_CHECKSUM_MISMATCH("knowledge_graph.0016", "Schema 迁移 checksum 不匹配:已应用的迁移被修改"),
+ SCHEMA_MIGRATION_LOCKED("knowledge_graph.0017", "Schema 迁移锁被占用,其他实例正在执行迁移");
private final String code;
private final String message;
diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java
index d938859..13932ef 100644
--- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java
+++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/GraphInitializer.java
@@ -1,24 +1,21 @@
package com.datamate.knowledgegraph.infrastructure.neo4j;
+import com.datamate.knowledgegraph.infrastructure.neo4j.migration.SchemaMigrationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
-import org.springframework.data.neo4j.core.Neo4jClient;
import org.springframework.stereotype.Component;
-import java.util.List;
import java.util.Set;
+import java.util.UUID;
/**
* 图谱 Schema 初始化器。
*
- * 应用启动时自动创建 Neo4j 索引和约束。
- * 所有语句使用 {@code IF NOT EXISTS},保证幂等性。
- *
- * 对应 {@code docs/knowledge-graph/schema/schema.cypher} 中的第 1-3 部分。
+ * 应用启动时通过 {@link SchemaMigrationService} 执行版本化 Schema 迁移。
*
* 安全自检:在非开发环境中,检测到默认 Neo4j 密码时拒绝启动。
*/
@@ -33,13 +30,8 @@ public class GraphInitializer implements ApplicationRunner {
"datamate123", "neo4j", "password", "123456", "admin"
);
- /** 仅识别「已存在」类错误消息的关键词,其余错误不应吞掉。 */
- private static final Set ALREADY_EXISTS_KEYWORDS = Set.of(
- "already exists", "already exist", "EquivalentSchemaRuleAlreadyExists"
- );
-
- private final Neo4jClient neo4jClient;
private final KnowledgeGraphProperties properties;
+ private final SchemaMigrationService schemaMigrationService;
@Value("${spring.neo4j.authentication.password:}")
private String neo4jPassword;
@@ -47,47 +39,6 @@ public class GraphInitializer implements ApplicationRunner {
@Value("${spring.profiles.active:default}")
private String activeProfile;
- /**
- * 需要在启动时执行的 Cypher 语句。
- * 每条语句必须独立执行(Neo4j 不支持多条 DDL 在同一事务中)。
- */
- private static final List SCHEMA_STATEMENTS = List.of(
- // 约束(自动创建对应索引)
- "CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (n:Entity) REQUIRE n.id IS UNIQUE",
-
- // 同步 upsert 复合唯一约束:防止并发写入产生重复实体
- "CREATE CONSTRAINT entity_sync_unique IF NOT EXISTS " +
- "FOR (n:Entity) REQUIRE (n.graph_id, n.source_id, n.type) IS UNIQUE",
-
- // 单字段索引
- "CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id)",
- "CREATE INDEX entity_type IF NOT EXISTS FOR (n:Entity) ON (n.type)",
- "CREATE INDEX entity_name IF NOT EXISTS FOR (n:Entity) ON (n.name)",
- "CREATE INDEX entity_source_id IF NOT EXISTS FOR (n:Entity) ON (n.source_id)",
- "CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at)",
-
- // 复合索引
- "CREATE INDEX entity_graph_id_type IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.type)",
- "CREATE INDEX entity_graph_id_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.id)",
- "CREATE INDEX entity_graph_id_source_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id, n.source_id)",
-
- // 全文索引
- "CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS FOR (n:Entity) ON EACH [n.name, n.description]",
-
- // ── SyncHistory 约束和索引 ──
-
- // P1: syncId 唯一约束,防止 ID 碰撞
- "CREATE CONSTRAINT sync_history_graph_sync_unique IF NOT EXISTS " +
- "FOR (h:SyncHistory) REQUIRE (h.graph_id, h.sync_id) IS UNIQUE",
-
- // P2-3: 查询优化索引
- "CREATE INDEX sync_history_graph_started IF NOT EXISTS " +
- "FOR (h:SyncHistory) ON (h.graph_id, h.started_at)",
-
- "CREATE INDEX sync_history_graph_status_started IF NOT EXISTS " +
- "FOR (h:SyncHistory) ON (h.graph_id, h.status, h.started_at)"
- );
-
@Override
public void run(ApplicationArguments args) {
// ── 安全自检:默认凭据检测 ──
@@ -98,32 +49,7 @@ public class GraphInitializer implements ApplicationRunner {
return;
}
- log.info("Initializing Neo4j schema: {} statements to execute", SCHEMA_STATEMENTS.size());
-
- int succeeded = 0;
- int failed = 0;
-
- for (String statement : SCHEMA_STATEMENTS) {
- try {
- neo4jClient.query(statement).run();
- succeeded++;
- log.debug("Schema statement executed: {}", truncate(statement));
- } catch (Exception e) {
- if (isAlreadyExistsError(e)) {
- // 约束/索引已存在,安全跳过
- succeeded++;
- log.debug("Schema element already exists (safe to skip): {}", truncate(statement));
- } else {
- // 非「已存在」错误:记录并抛出,阻止启动
- failed++;
- log.error("Schema statement FAILED: {} — {}", truncate(statement), e.getMessage());
- throw new IllegalStateException(
- "Neo4j schema initialization failed: " + truncate(statement), e);
- }
- }
- }
-
- log.info("Neo4j schema initialization completed: succeeded={}, failed={}", succeeded, failed);
+ schemaMigrationService.migrate(UUID.randomUUID().toString());
}
/**
@@ -149,20 +75,4 @@ public class GraphInitializer implements ApplicationRunner {
}
}
}
-
- /**
- * 判断异常是否仅因为 Schema 元素已存在(安全可忽略)。
- */
- private static boolean isAlreadyExistsError(Exception e) {
- String msg = e.getMessage();
- if (msg == null) {
- return false;
- }
- String lowerMsg = msg.toLowerCase();
- return ALREADY_EXISTS_KEYWORDS.stream().anyMatch(kw -> lowerMsg.contains(kw.toLowerCase()));
- }
-
- private static String truncate(String s) {
- return s.length() <= 100 ? s : s.substring(0, 97) + "...";
- }
}
diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java
index b048231..364579a 100644
--- a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java
+++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/KnowledgeGraphProperties.java
@@ -32,6 +32,9 @@ public class KnowledgeGraphProperties {
/** 安全相关配置 */
private Security security = new Security();
+ /** Schema 迁移配置 */
+ private Migration migration = new Migration();
+
@Data
public static class Security {
@@ -82,4 +85,14 @@ public class KnowledgeGraphProperties {
*/
private boolean allowPurgeOnEmptySnapshot = false;
}
+
+ @Data
+ public static class Migration {
+
+ /** 是否启用 Schema 版本化迁移 */
+ private boolean enabled = true;
+
+ /** 是否校验已应用迁移的 checksum(防止迁移被篡改) */
+ private boolean validateChecksums = true;
+ }
}
diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigration.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigration.java
new file mode 100644
index 0000000..cc63312
--- /dev/null
+++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigration.java
@@ -0,0 +1,20 @@
+package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
+
+import java.util.List;
+
+/**
+ * Schema 迁移接口。
+ *
+ * 每个实现类代表一个版本化的 Schema 变更,版本号单调递增。
+ */
+public interface SchemaMigration {
+
+ /** 单调递增版本号 (1, 2, 3...) */
+ int getVersion();
+
+ /** 人类可读描述 */
+ String getDescription();
+
+ /** Cypher DDL 语句列表 */
+ List getStatements();
+}
diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationRecord.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationRecord.java
new file mode 100644
index 0000000..5d7a0f3
--- /dev/null
+++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationRecord.java
@@ -0,0 +1,42 @@
+package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 迁移记录数据类,映射 {@code _SchemaMigration} 节点。
+ *
+ * 纯 POJO,不使用 SDN {@code @Node} 注解。
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SchemaMigrationRecord {
+
+ /** 迁移版本号 */
+ private int version;
+
+ /** 迁移描述 */
+ private String description;
+
+ /** 迁移语句的 SHA-256 校验和 */
+ private String checksum;
+
+ /** 迁移应用时间(ISO-8601) */
+ private String appliedAt;
+
+ /** 迁移执行耗时(毫秒) */
+ private long executionTimeMs;
+
+ /** 迁移是否成功 */
+ private boolean success;
+
+ /** 迁移语句数量 */
+ private int statementsCount;
+
+ /** 失败时的错误信息 */
+ private String errorMessage;
+}
diff --git a/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java
new file mode 100644
index 0000000..c9a1f8c
--- /dev/null
+++ b/backend/services/knowledge-graph-service/src/main/java/com/datamate/knowledgegraph/infrastructure/neo4j/migration/SchemaMigrationService.java
@@ -0,0 +1,358 @@
+package com.datamate.knowledgegraph.infrastructure.neo4j.migration;
+
+import com.datamate.common.infrastructure.exception.BusinessException;
+import com.datamate.knowledgegraph.infrastructure.exception.KnowledgeGraphErrorCode;
+import com.datamate.knowledgegraph.infrastructure.neo4j.KnowledgeGraphProperties;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.neo4j.core.Neo4jClient;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Schema 迁移编排器。
+ *
+ * 参考 Flyway 设计思路,为 Neo4j 图数据库提供版本化迁移机制:
+ *
+ * - 在数据库中记录已应用的迁移版本({@code _SchemaMigration} 节点)
+ * - 自动检测并执行新增迁移
+ * - 通过 checksum 校验防止已应用迁移被篡改
+ * - 通过分布式锁({@code _SchemaLock} 节点)防止多实例并发迁移
+ *
+ */
+@Component
+@Slf4j
+public class SchemaMigrationService {
+
+ /** 分布式锁过期时间(毫秒),5 分钟 */
+ private static final long LOCK_TIMEOUT_MS = 5 * 60 * 1000;
+
+ /** 仅识别「已存在」类错误消息的关键词,其余错误不应吞掉。 */
+ private static final Set ALREADY_EXISTS_KEYWORDS = Set.of(
+ "already exists", "already exist", "EquivalentSchemaRuleAlreadyExists"
+ );
+
+ private final Neo4jClient neo4jClient;
+ private final KnowledgeGraphProperties properties;
+ private final List migrations;
+
+ public SchemaMigrationService(Neo4jClient neo4jClient,
+ KnowledgeGraphProperties properties,
+ List migrations) {
+ this.neo4jClient = neo4jClient;
+ this.properties = properties;
+ this.migrations = migrations.stream()
+ .sorted(Comparator.comparingInt(SchemaMigration::getVersion))
+ .toList();
+ }
+
+ /**
+ * 执行 Schema 迁移主流程。
+ *
+ * @param instanceId 当前实例标识,用于分布式锁
+ */
+ public void migrate(String instanceId) {
+ if (!properties.getMigration().isEnabled()) {
+ log.info("Schema migration is disabled, skipping");
+ return;
+ }
+
+ log.info("Starting schema migration, instanceId={}", instanceId);
+
+ // 1. Bootstrap — 创建迁移系统自身需要的约束
+ bootstrapMigrationSchema();
+
+ // 2. 获取分布式锁
+ acquireLock(instanceId);
+
+ try {
+ // 3. 加载已应用迁移
+ List applied = loadAppliedMigrations();
+
+ // 4. 校验 checksum
+ if (properties.getMigration().isValidateChecksums()) {
+ validateChecksums(applied, migrations);
+ }
+
+ // 5. 过滤待执行迁移
+ Set appliedVersions = applied.stream()
+ .map(SchemaMigrationRecord::getVersion)
+ .collect(Collectors.toSet());
+
+ List pending = migrations.stream()
+ .filter(m -> !appliedVersions.contains(m.getVersion()))
+ .toList();
+
+ if (pending.isEmpty()) {
+ log.info("Schema is up to date, no pending migrations");
+ return;
+ }
+
+ // 6. 逐个执行
+ executePendingMigrations(pending);
+
+ log.info("Schema migration completed successfully, applied {} migration(s)", pending.size());
+
+ } finally {
+ // 7. 释放锁
+ releaseLock(instanceId);
+ }
+ }
+
+ /**
+ * 创建迁移系统自身需要的约束(解决鸡生蛋问题)。
+ */
+ void bootstrapMigrationSchema() {
+ log.debug("Bootstrapping migration schema constraints");
+ neo4jClient.query(
+ "CREATE CONSTRAINT schema_migration_version_unique IF NOT EXISTS " +
+ "FOR (n:_SchemaMigration) REQUIRE n.version IS UNIQUE"
+ ).run();
+ neo4jClient.query(
+ "CREATE CONSTRAINT schema_lock_name_unique IF NOT EXISTS " +
+ "FOR (n:_SchemaLock) REQUIRE n.name IS UNIQUE"
+ ).run();
+ }
+
+ /**
+ * 获取分布式锁。
+ *
+ * MERGE {@code _SchemaLock} 节点,如果锁已被其他实例持有且未过期,则抛出异常。
+ * 如果锁已过期(超过 5 分钟),自动接管。
+ *
+ * 时间戳完全使用数据库端 {@code datetime().epochMillis},避免多实例时钟偏差导致锁被误抢占。
+ */
+ void acquireLock(String instanceId) {
+ log.debug("Acquiring schema migration lock, instanceId={}", instanceId);
+
+ // 使用数据库时间(datetime().epochMillis)避免多实例时钟偏差导致锁被误抢占
+ Optional