fix(kg): 修复 Codex 审查发现的 P1/P2 问题并补全测试

修复内容:

P1 级别(关键):
1. 数据隔离漏洞:邻居查询添加 graph_id 路径约束,防止跨图谱数据泄漏
2. 空快照误删风险:添加 allowPurgeOnEmptySnapshot 保护开关(默认 false)
3. 弱默认凭据:启动自检,生产环境检测到默认密码直接拒绝启动

P2 级别(重要):
4. 配置校验:importBatchSize 添加 @Min(1) 验证,启动时 fail-fast
5. N+1 性能:重写 upsertEntity 为单条 Cypher 查询(从 3 条优化到 1 条)
6. 服务认证:添加 mTLS/JWT 文档说明
7. 错误处理:改进 Schema 初始化和序列化错误处理

测试覆盖:
- 新增 69 个单元测试,全部通过
- GraphEntityServiceTest: 13 个测试(CRUD、验证、分页)
- GraphRelationServiceTest: 13 个测试(CRUD、方向验证)
- GraphSyncServiceTest: 5 个测试(验证、全量同步)
- GraphSyncStepServiceTest: 14 个测试(空快照保护、N+1 验证)
- GraphQueryServiceTest: 13 个测试(邻居/路径/子图/搜索)
- GraphInitializerTest: 11 个测试(凭据验证、Schema 初始化)

技术细节:
- 数据隔离:使用 ALL() 函数约束路径中所有节点和关系的 graph_id
- 空快照保护:新增配置项 allow-purge-on-empty-snapshot 和错误码 EMPTY_SNAPSHOT_PURGE_BLOCKED
- 凭据检查:Java 和 Python 双端实现,根据环境(dev/test/prod)采取不同策略
- 性能优化:使用 SDN 复合属性格式(properties.key)在 MERGE 中直接设置属性
- 属性安全:使用白名单 [a-zA-Z0-9_] 防止 Cypher 注入

代码变更:+210 行,-29 行
This commit is contained in:
2026-02-18 09:25:00 +08:00
parent a260134d7c
commit 37b478a052
16 changed files with 1494 additions and 29 deletions

View File

@@ -1,6 +1,15 @@
from pydantic_settings import BaseSettings
from pydantic import SecretStr, model_validator
from typing import Optional
import logging
import os
_logger = logging.getLogger(__name__)
# 已知的弱默认凭据,生产环境禁止使用
_BLOCKED_DEFAULT_PASSWORDS = {"password", "123456", "admin", "root", "datamate123"}
_BLOCKED_DEFAULT_TOKENS = {"abc123abc123", "EMPTY"}
class Settings(BaseSettings):
"""应用程序配置"""
@@ -76,5 +85,37 @@ class Settings(BaseSettings):
# 标注编辑器(Label Studio Editor)相关
editor_max_text_bytes: int = 0 # <=0 表示不限制,正数为最大字节数
@model_validator(mode='after')
def check_default_credentials(self):
"""生产环境下检测弱默认凭据,拒绝启动。
通过环境变量 DATAMATE_ENV 判断环境:
- dev/test/local: 仅发出警告
- 其他(prod/staging 等): 抛出异常阻止启动
"""
env = os.environ.get("DATAMATE_ENV", "dev").lower()
is_dev = env in ("dev", "test", "local", "development")
issues: list[str] = []
if self.mysql_password in _BLOCKED_DEFAULT_PASSWORDS:
issues.append(f"mysql_password is set to a weak default ('{self.mysql_password}')")
if self.label_studio_password and self.label_studio_password in _BLOCKED_DEFAULT_PASSWORDS:
issues.append("label_studio_password is set to a weak default")
if self.label_studio_user_token and self.label_studio_user_token in _BLOCKED_DEFAULT_TOKENS:
issues.append("label_studio_user_token is set to a weak default")
if issues:
msg = "SECURITY: Weak default credentials detected: " + "; ".join(issues)
if is_dev:
_logger.warning(msg + " (acceptable in dev/test, MUST change for production)")
else:
raise ValueError(
msg + ". Set proper credentials via environment variables "
"before deploying to production."
)
return self
# 全局设置实例
settings = Settings()