Files
DataMate/runtime/datamate-python/app/module/kg_graphrag/kg_client.py
Jerry Yan 9b6ff59a11 feat(kg): 实现 Phase 3.3 性能优化
核心功能:
- Neo4j 索引优化(entityType, graphId, properties.name)
- Redis 缓存(Java 侧,3 个缓存区,TTL 可配置)
- LRU 缓存(Python 侧,KG + Embedding,线程安全)
- 细粒度缓存清除(graphId 前缀匹配)
- 失败路径缓存清除(finally 块)

新增文件(Java 侧,7 个):
- V2__PerformanceIndexes.java - Flyway 迁移,创建 3 个索引
- IndexHealthService.java - 索引健康监控
- RedisCacheConfig.java - Spring Cache + Redis 配置
- GraphCacheService.java - 缓存清除管理器
- CacheableIntegrationTest.java - 集成测试(10 tests)
- GraphCacheServiceTest.java - 单元测试(19 tests)
- V2__PerformanceIndexesTest.java, IndexHealthServiceTest.java

新增文件(Python 侧,2 个):
- cache.py - 内存 TTL+LRU 缓存(cachetools)
- test_cache.py - 单元测试(20 tests)

修改文件(Java 侧,9 个):
- GraphEntityService.java - 添加 @Cacheable,缓存清除
- GraphQueryService.java - 添加 @Cacheable(包含用户权限上下文)
- GraphRelationService.java - 添加缓存清除
- GraphSyncService.java - 添加缓存清除(finally 块,失败路径)
- KnowledgeGraphProperties.java - 添加 Cache 配置类
- application-knowledgegraph.yml - 添加 Redis 和缓存 TTL 配置
- GraphEntityServiceTest.java - 添加 verify(cacheService) 断言
- GraphRelationServiceTest.java - 添加 verify(cacheService) 断言
- GraphSyncServiceTest.java - 添加失败路径缓存清除测试

修改文件(Python 侧,5 个):
- kg_client.py - 集成缓存(fulltext_search, get_subgraph)
- interface.py - 添加 /cache/stats 和 /cache/clear 端点
- config.py - 添加缓存配置字段
- pyproject.toml - 添加 cachetools 依赖
- test_kg_client.py - 添加 _disable_cache fixture

安全修复(3 轮迭代):
- P0: 缓存 key 用户隔离(防止跨用户数据泄露)
- P1-1: 同步子步骤后的缓存清除(18 个方法)
- P1-2: 实体创建后的搜索缓存清除
- P1-3: 失败路径缓存清除(finally 块)
- P2-1: 细粒度缓存清除(graphId 前缀匹配,避免跨图谱冲刷)
- P2-2: 服务层测试添加 verify(cacheService) 断言

测试结果:
- Java: 280 tests pass  (270 → 280, +10 new)
- Python: 154 tests pass  (140 → 154, +14 new)

缓存配置:
- kg:entities - 实体缓存,TTL 1h
- kg:queries - 查询结果缓存,TTL 5min
- kg:search - 全文搜索缓存,TTL 3min
- KG cache (Python) - 256 entries, 5min TTL
- Embedding cache (Python) - 512 entries, 10min TTL
2026-02-20 18:28:33 +08:00

215 lines
7.4 KiB
Python

"""KG 服务 REST 客户端。
通过 httpx 调用 Java 侧 knowledge-graph-service 的查询 API,
包括全文检索和子图导出。
失败策略:fail-open —— KG 服务不可用时返回空结果 + 日志告警。
"""
from __future__ import annotations
import httpx
from app.core.logging import get_logger
from app.module.kg_graphrag.cache import get_cache, make_cache_key
from app.module.kg_graphrag.models import EntitySummary, RelationSummary
logger = get_logger(__name__)
class KGServiceClient:
"""Java KG 服务 REST 客户端。"""
def __init__(
self,
*,
base_url: str = "http://datamate-kg:8080",
internal_token: str = "",
timeout: float = 30.0,
) -> None:
self._base_url = base_url.rstrip("/")
self._internal_token = internal_token
self._timeout = timeout
self._client: httpx.AsyncClient | None = None
@classmethod
def from_settings(cls) -> KGServiceClient:
from app.core.config import settings
return cls(
base_url=settings.graphrag_kg_service_url,
internal_token=settings.graphrag_kg_internal_token,
timeout=30.0,
)
def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(
base_url=self._base_url,
timeout=self._timeout,
)
return self._client
def _headers(self, user_id: str = "") -> dict[str, str]:
headers: dict[str, str] = {}
if self._internal_token:
headers["X-Internal-Token"] = self._internal_token
if user_id:
headers["X-User-Id"] = user_id
return headers
async def fulltext_search(
self,
graph_id: str,
query: str,
size: int = 10,
user_id: str = "",
) -> list[EntitySummary]:
"""调用 KG 服务全文检索,返回匹配的实体列表。
Fail-open: KG 服务不可用时返回空列表。
结果会被缓存(TTL 由 graphrag_cache_kg_ttl 控制)。
"""
cache = get_cache()
cache_key = make_cache_key("fulltext", graph_id, query, size, user_id)
cached = cache.get_kg(cache_key)
if cached is not None:
return cached
try:
result = await self._fulltext_search_impl(graph_id, query, size, user_id)
cache.set_kg(cache_key, result)
return result
except Exception:
logger.exception(
"KG fulltext search failed for graph_id=%s (fail-open, returning empty)",
graph_id,
)
return []
async def _fulltext_search_impl(
self,
graph_id: str,
query: str,
size: int,
user_id: str,
) -> list[EntitySummary]:
client = self._get_client()
resp = await client.get(
f"/knowledge-graph/{graph_id}/query/search",
params={"q": query, "size": size},
headers=self._headers(user_id),
)
resp.raise_for_status()
body = resp.json()
# Java 返回 PagedResponse<SearchHitVO>:
# 可能被全局包装为 {"code": 200, "data": PagedResponse}
# 也可能直接返回 PagedResponse {"page": 0, "content": [...]}
data = body.get("data", body)
# PagedResponse 将实体列表放在 content 字段中
items: list[dict] = (
data.get("content", []) if isinstance(data, dict) else data if isinstance(data, list) else []
)
entities: list[EntitySummary] = []
for item in items:
entities.append(
EntitySummary(
id=str(item.get("id", "")),
name=item.get("name", ""),
type=item.get("type", ""),
description=item.get("description", ""),
)
)
return entities
async def get_subgraph(
self,
graph_id: str,
entity_ids: list[str],
depth: int = 1,
user_id: str = "",
) -> tuple[list[EntitySummary], list[RelationSummary]]:
"""获取种子实体的 N-hop 子图。
Fail-open: KG 服务不可用时返回空子图。
结果会被缓存(TTL 由 graphrag_cache_kg_ttl 控制)。
"""
cache = get_cache()
cache_key = make_cache_key("subgraph", graph_id, sorted(entity_ids), depth, user_id)
cached = cache.get_kg(cache_key)
if cached is not None:
return cached
try:
result = await self._get_subgraph_impl(graph_id, entity_ids, depth, user_id)
cache.set_kg(cache_key, result)
return result
except Exception:
logger.exception(
"KG subgraph export failed for graph_id=%s (fail-open, returning empty)",
graph_id,
)
return [], []
async def _get_subgraph_impl(
self,
graph_id: str,
entity_ids: list[str],
depth: int,
user_id: str,
) -> tuple[list[EntitySummary], list[RelationSummary]]:
client = self._get_client()
resp = await client.post(
f"/knowledge-graph/{graph_id}/query/subgraph/export",
params={"depth": depth},
json={"entityIds": entity_ids},
headers=self._headers(user_id),
)
resp.raise_for_status()
body = resp.json()
# Java 返回 SubgraphExportVO:
# 可能被全局包装为 {"code": 200, "data": SubgraphExportVO}
# 也可能直接返回 SubgraphExportVO {"nodes": [...], "edges": [...]}
data = body.get("data", body) if isinstance(body.get("data"), dict) else body
nodes_raw = data.get("nodes", [])
edges_raw = data.get("edges", [])
# ExportNodeVO: id, name, type, description, properties (Map)
entities: list[EntitySummary] = []
for node in nodes_raw:
entities.append(
EntitySummary(
id=str(node.get("id", "")),
name=node.get("name", ""),
type=node.get("type", ""),
description=node.get("description", ""),
)
)
relations: list[RelationSummary] = []
# 构建 id -> entity 的映射用于查找 source/target 名称和类型
entity_map = {e.id: e for e in entities}
# ExportEdgeVO: sourceEntityId, targetEntityId, relationType
# 注意:sourceId 是数据来源 ID,不是源实体 ID
for edge in edges_raw:
source_id = str(edge.get("sourceEntityId", ""))
target_id = str(edge.get("targetEntityId", ""))
source_entity = entity_map.get(source_id)
target_entity = entity_map.get(target_id)
relations.append(
RelationSummary(
source_name=source_entity.name if source_entity else source_id,
source_type=source_entity.type if source_entity else "",
target_name=target_entity.name if target_entity else target_id,
target_type=target_entity.type if target_entity else "",
relation_type=edge.get("relationType", ""),
)
)
return entities, relations
async def close(self) -> None:
if self._client is not None:
await self._client.aclose()
self._client = None