# DataMate 知识图谱架构设计 ## 🏗️ 整体架构 ### 分层架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ 前端层 (Frontend) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ React + AntV G6 │ │ │ │ - 图谱可视化(分层加载、子图裁剪) │ │ │ │ - 图谱编辑(Human-in-the-loop) │ │ │ │ - 查询界面(Cypher 查询构建器) │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ↓ HTTP/REST ┌─────────────────────────────────────────────────────────────┐ │ 服务层 (Service) │ │ ┌──────────────────────┐ ┌──────────────────────────┐ │ │ │ kg-service │ │ rag-query-service │ │ │ │ (Spring Boot) │ │ (FastAPI) │ │ │ │ │ │ │ │ │ │ - 图查询 API │ │ - 混合检索 │ │ │ │ - 权限过滤 │ │ - GraphRAG │ │ │ │ - 缓存层 (Redis) │ │ - 向量检索 + 图检索 │ │ │ └──────────────────────┘ └──────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ 摄入层 (Ingestion) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ kg-ingestion (FastAPI) │ │ │ │ - LangChain LLMGraphTransformer │ │ │ │ - 实体对齐(向量相似度 + LLM) │ │ │ │ - 关系生成(规则 + LLM) │ │ │ │ - 置信度评分 │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ 存储层 (Storage) │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ MySQL │ │ Neo4j │ │ Milvus │ │ │ │ (元数据) │ │ (图结构) │ │ (向量) │ │ │ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ## 🔧 技术选型 ### 图数据库:Neo4j **选择理由**: - ✅ 成熟稳定,社区活跃 - ✅ Cypher 查询语言简洁强大 - ✅ Spring Data Neo4j 集成良好 - ✅ 支持 ACID 事务 - ✅ 丰富的图算法库 **版本**:Neo4j 社区版(生产环境可升级企业版) **配置**: - 端口:7474 (HTTP), 7687 (Bolt) - 内存:heap 512MB, page cache 512MB(可根据数据量调整) - 持久化:Docker volume ### 后端框架 #### knowledge-graph-service (Spring Boot) **职责**: - 图谱查询 API - 权限控制和租户隔离 - 缓存管理 - 与其他服务集成 **技术栈**: - Spring Boot 3.x - Spring Data Neo4j - Spring Security(权限控制) - Redis(缓存) **DDD 分层**: ``` com.datamate.knowledgegraph/ ├── application/ # 应用服务层 │ └── GraphEntityService.java ├── domain/ # 领域层 │ ├── model/ │ │ ├── GraphEntity.java │ │ └── GraphRelation.java │ └── repository/ │ └── GraphEntityRepository.java ├── infrastructure/ # 基础设施层 │ ├── neo4j/ │ │ └── KnowledgeGraphProperties.java │ └── exception/ │ └── KnowledgeGraphErrorCode.java └── interfaces/ # 接口层 ├── rest/ │ └── GraphEntityController.java └── dto/ ├── CreateEntityRequest.java ├── UpdateEntityRequest.java └── CreateRelationRequest.java ``` #### kg-ingestion (FastAPI) **职责**: - 知识抽取(文本 → 实体 + 关系) - 实体对齐和消歧 - 关系生成和验证 - 置信度评分 **技术栈**: - FastAPI - LangChain - LangChain LLMGraphTransformer - Pydantic(数据验证) **模块结构**: ``` kg_extraction/ ├── __init__.py ├── models.py # 数据模型 ├── extractor.py # 抽取器 └── aligner.py # 实体对齐(待实现) ``` ### 前端框架 **技术栈**: - React 18 - AntV G6(图可视化) - TypeScript - Ant Design(UI 组件) **核心功能**: - 图谱可视化(支持 10000+ 节点) - 交互式查询构建器 - 实时编辑和反馈 - 导出和分享 ## 🔐 安全设计 ### 多租户隔离 **策略**: - 所有实体和关系都包含 `graph_id` 属性 - 查询时自动添加 `graph_id` 过滤条件 - Neo4j 索引包含 `graph_id` **实现**: ```cypher // 创建索引 CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id); // 查询时自动过滤 MATCH (n:Entity {graph_id: $graphId}) WHERE n.id = $entityId RETURN n; ``` ### 权限控制 **graphId 双重防御**: 1. **Controller 层**:`@Pattern(regexp = UUID_REGEX)` 格式校验 2. **Service 层**:`validateGraphId()` 业务校验 **实现**: ```java // Controller 层 @GetMapping("/{graphId}/entities/{entityId}") public GraphEntity getEntity( @PathVariable @Pattern(regexp = UUID_REGEX) String graphId, @PathVariable @Pattern(regexp = UUID_REGEX) String entityId ) { return entityService.getEntity(graphId, entityId); } // Service 层 public GraphEntity getEntity(String graphId, String entityId) { validateGraphId(graphId); return entityRepository.findByIdAndGraphId(entityId, graphId) .orElseThrow(() -> BusinessException.of( KnowledgeGraphErrorCode.ENTITY_NOT_FOUND )); } ``` ### Cypher 注入防护 **策略**: - 使用参数化查询 - 禁止拼接 Cypher 字符串 - 输入验证和转义 **示例**: ```java // ✅ 正确:参数化查询 @Query("MATCH (n:Entity {id: $id, graph_id: $graphId}) RETURN n") Optional findByIdAndGraphId( @Param("id") String id, @Param("graphId") String graphId ); // ❌ 错误:字符串拼接 String cypher = "MATCH (n:Entity {id: '" + id + "'}) RETURN n"; ``` ## 📊 数据同步策略 ### MySQL → Neo4j 同步 **策略**:最终一致性 + 对账机制 **同步方式**: 1. **实时同步**:通过 CDC(Change Data Capture)捕获 MySQL 变更 2. **批量同步**:定时任务(每小时/每天)全量同步 3. **手动同步**:提供 API 触发同步 **对账机制**: - 每天凌晨对比 MySQL 和 Neo4j 的数据 - 发现不一致时记录日志并告警 - 提供修复工具 **实现**: ```java @Scheduled(cron = "0 0 * * * *") // 每小时 public void syncFromMySQL() { // 1. 查询 MySQL 中的变更 List changedDatasets = datasetRepository .findByUpdatedAtAfter(lastSyncTime); // 2. 转换为图实体 List entities = changedDatasets.stream() .map(this::toGraphEntity) .collect(Collectors.toList()); // 3. 批量写入 Neo4j graphEntityRepository.saveAll(entities); // 4. 更新同步时间 lastSyncTime = Instant.now(); } ``` ## ⚡ 性能优化 ### 查询优化 **策略**: 1. **限制遍历深度**:最大 3 跳 2. **限制返回节点数**:最大 1000 个 3. **使用索引**:在高频查询字段上创建索引 4. **缓存热点数据**:使用 Redis 缓存 **实现**: ```java public List getNeighbors( String graphId, String entityId, int depth, int limit ) { // Clamp 参数 int actualDepth = Math.min(depth, properties.getMaxDepth()); int actualLimit = Math.min(limit, properties.getMaxNodesPerQuery()); // 查询 return entityRepository.findNeighbors( graphId, entityId, actualDepth, actualLimit ); } ``` ### 索引策略 **必需索引**: ```cypher // 实体 ID 索引 CREATE INDEX entity_id IF NOT EXISTS FOR (n:Entity) ON (n.id); // 图 ID 索引 CREATE INDEX entity_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.graph_id); // 复合索引 CREATE INDEX entity_id_graph_id IF NOT EXISTS FOR (n:Entity) ON (n.id, n.graph_id); ``` ### 缓存策略 **缓存层次**: 1. **L1 缓存**:Spring Cache(本地缓存) 2. **L2 缓存**:Redis(分布式缓存) 3. **L3 缓存**:Neo4j 内置缓存 **缓存内容**: - 热点实体(访问频率 > 100/小时) - 常用子图(2-hop 邻居) - 查询结果(TTL 5 分钟) ## 🔄 GraphRAG 融合 ### 混合检索架构 ``` 用户查询 ↓ ┌─────────────────────────────────────┐ │ 查询理解和改写 │ └─────────────────────────────────────┘ ↓ ┌─────────────────────────────────────┐ │ 并行检索 │ │ ┌───────────┐ ┌─────────────┐ │ │ │ Milvus │ │ Neo4j │ │ │ │ 向量检索 │ │ 图检索 │ │ │ │ Top-K │ │ 2-hop 子图 │ │ │ └───────────┘ └─────────────┘ │ └─────────────────────────────────────┘ ↓ ┌─────────────────────────────────────┐ │ 结果融合和排序 │ │ - 向量相似度 × 0.6 │ │ - 图结构相关性 × 0.4 │ └─────────────────────────────────────┘ ↓ ┌─────────────────────────────────────┐ │ Context 构建 │ │ - 文档片段(Milvus) │ │ - 三元组文本化(Neo4j) │ └─────────────────────────────────────┘ ↓ ┌─────────────────────────────────────┐ │ LLM 生成 │ └─────────────────────────────────────┘ ``` ### 三元组文本化 **策略**:将图结构转换为自然语言 **示例**: ```python # 图结构 (Dataset:用户行为数据)-[HAS_FIELD]->(Field:user_id) (Dataset:用户行为数据)-[USED_BY]->(Workflow:用户画像构建) # 文本化 """ 数据集"用户行为数据"包含字段"user_id"。 数据集"用户行为数据"被工作流"用户画像构建"使用。 """ ``` ## 📈 监控和运维 ### 监控指标 **Neo4j 指标**: - 节点数量 - 关系数量 - 查询响应时间 - 内存使用率 - 磁盘使用率 **服务指标**: - API 响应时间 - 错误率 - 吞吐量 - 缓存命中率 **工具**: - Prometheus(指标采集) - Grafana(可视化) - Neo4j Metrics(Neo4j 专用指标) ### 备份策略 **Neo4j 备份**: - 每天凌晨全量备份 - 保留最近 7 天的备份 - 备份到对象存储(S3/OSS) **恢复测试**: - 每月进行一次恢复演练 - 验证备份的完整性和可用性 ## 🔗 相关文档 - [总体方案](./README.md) - [实施计划](./implementation.md) - [AI 分析结果](./analysis/)