Files
DataMate/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py
Jerry Yan 4143bc75f9 fix: 修复codex review发现的问题
问题1 - 行锁持有时间过长:
- 采用双重检查锁定模式,将HTTP调用移到锁范围外
- 新增 _update_knowledge_set_config 方法专门处理加锁更新

问题2 - 清理不完整:
- _list_knowledge_sets 方法添加分页参数
- 新增 _list_all_knowledge_sets 方法遍历所有知识集
- 清理方法使用新的全量查询方法

问题3 - 文件删除逻辑可能误删:
- deleteKnowledgeItemFile 方法增加严格的 sourceType 检查
- 只有当 sourceType 为 FILE_UPLOAD 或 MANUAL 时才删除文件
- 避免误删 DATASET_FILE 类型的数据集文件

涉及文件:
- knowledge_sync.py
- KnowledgeItemApplicationService.java
2026-02-05 04:07:40 +08:00

542 lines
19 KiB
Python

from __future__ import annotations
import json
from datetime import datetime
from typing import Any, Dict, Optional
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.db.models import Dataset, DatasetFiles, LabelingProject
logger = get_logger(__name__)
class KnowledgeSyncService:
"""标注保存后同步到知识管理"""
CONFIG_KEY_SET_ID = "knowledge_set_id"
CONFIG_KEY_SET_NAME = "knowledge_set_name"
KNOWLEDGE_SET_LIST_SIZE = 50
def __init__(self, db: AsyncSession):
self.db = db
self.base_url = settings.datamate_backend_base_url.rstrip("/")
async def sync_annotation_to_knowledge(
self,
project: LabelingProject,
file_record: DatasetFiles,
annotation: Dict[str, Any],
annotation_updated_at: Optional[datetime],
) -> None:
"""将标注结果同步到知识管理(失败不影响标注保存)"""
if not project or not file_record:
logger.warning("标注同步失败:缺少项目或文件信息")
return
set_id = await self._ensure_knowledge_set(project)
if not set_id:
logger.warning("标注同步失败:无法获取知识集")
return
item = await self._get_item_by_source(
set_id, project.dataset_id, str(file_record.id)
)
if item and item.get("status") in {"PUBLISHED", "ARCHIVED", "DEPRECATED"}:
logger.info(
"知识条目为只读状态,跳过同步:item_id=%s status=%s",
item.get("id"),
item.get("status"),
)
return
payload = await self._build_item_payload(
project=project,
file_record=file_record,
annotation=annotation,
annotation_updated_at=annotation_updated_at,
)
if not payload:
logger.warning("标注同步失败:无法构建知识条目内容")
return
try:
if item:
await self._update_item(set_id, item["id"], payload)
else:
await self._create_item(set_id, payload)
except Exception as exc:
logger.warning("标注同步到知识管理失败:%s", exc)
async def _ensure_knowledge_set(self, project: LabelingProject) -> Optional[str]:
# 第一次检查:无锁查询配置
config = (
project.configuration if isinstance(project.configuration, dict) else {}
)
set_id = config.get(self.CONFIG_KEY_SET_ID)
if set_id:
exists = await self._get_knowledge_set(set_id)
if exists and self._metadata_matches_project(
exists.get("metadata"), project.id
):
return set_id
logger.warning(
"知识集不存在或归属不匹配,准备重建:set_id=%s project_id=%s",
set_id,
project.id,
)
set_id = None
project_name = (
project.name or "annotation-project"
).strip() or "annotation-project"
metadata = self._build_set_metadata(project)
existing = await self._find_knowledge_set_by_name_and_project(
project_name, project.id
)
if existing:
return await self._update_knowledge_set_config(project, existing)
created = await self._create_knowledge_set(project_name, metadata)
if not created:
created = await self._find_knowledge_set_by_name_and_project(
project_name, project.id
)
if not created:
fallback_name = self._build_fallback_set_name(project_name, project.id)
existing = await self._find_knowledge_set_by_name_and_project(
fallback_name, project.id
)
if existing:
created = existing
else:
created = await self._create_knowledge_set(fallback_name, metadata)
if not created:
created = await self._find_knowledge_set_by_name_and_project(
fallback_name, project.id
)
if not created:
return None
return await self._update_knowledge_set_config(project, created)
async def _get_knowledge_set(self, set_id: str) -> Optional[Dict[str, Any]]:
try:
return await self._request(
"GET", f"/data-management/knowledge-sets/{set_id}"
)
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 404:
return None
raise
async def _list_knowledge_sets(
self,
keyword: Optional[str],
page: Optional[int] = None,
size: Optional[int] = None,
) -> list[Dict[str, Any]]:
params: Dict[str, Any] = {
"page": page if page is not None else 1,
"size": size if size is not None else self.KNOWLEDGE_SET_LIST_SIZE,
}
if keyword:
params["keyword"] = keyword
try:
data = await self._request(
"GET", "/data-management/knowledge-sets", params=params
)
except httpx.HTTPStatusError as exc:
logger.warning(
"查询知识集失败:keyword=%s status=%s",
keyword,
exc.response.status_code,
)
return []
if not isinstance(data, dict):
return []
content = data.get("content")
if not isinstance(content, list):
return []
return [item for item in content if isinstance(item, dict)]
async def _list_all_knowledge_sets(
self, keyword: Optional[str] = None
) -> list[Dict[str, Any]]:
page = 1
all_items: list[Dict[str, Any]] = []
while True:
items = await self._list_knowledge_sets(
keyword, page=page, size=self.KNOWLEDGE_SET_LIST_SIZE
)
if not items:
break
all_items.extend(items)
if len(items) < self.KNOWLEDGE_SET_LIST_SIZE:
break
page += 1
return all_items
async def _find_knowledge_set_by_name_and_project(
self, name: str, project_id: str
) -> Optional[Dict[str, Any]]:
if not name:
return None
items = await self._list_all_knowledge_sets(name)
if not items:
return None
for item in items:
if item.get("name") != name:
continue
if self._metadata_matches_project(item.get("metadata"), project_id):
return item
return None
async def _create_knowledge_set(
self, name: str, metadata: str
) -> Optional[Dict[str, Any]]:
payload = {
"name": name,
"description": "标注项目自动创建的知识集",
"status": "DRAFT",
"metadata": metadata,
}
try:
return await self._request(
"POST", "/data-management/knowledge-sets", json=payload
)
except httpx.HTTPStatusError as exc:
logger.warning(
"创建知识集失败:name=%s status=%s detail=%s",
name,
exc.response.status_code,
self._safe_response_text(exc.response),
)
return None
async def _get_item_by_source(
self,
set_id: str,
dataset_id: str,
file_id: str,
) -> Optional[Dict[str, Any]]:
params = {
"page": 1,
"size": 1,
"sourceDatasetId": dataset_id,
"sourceFileId": file_id,
}
try:
data = await self._request(
"GET", f"/data-management/knowledge-sets/{set_id}/items", params=params
)
except httpx.HTTPStatusError as exc:
logger.warning(
"查询知识条目失败:set_id=%s status=%s",
set_id,
exc.response.status_code,
)
return None
if not isinstance(data, dict):
return None
content = data.get("content") or []
if not content:
return None
return content[0]
async def _create_item(self, set_id: str, payload: Dict[str, Any]) -> None:
await self._request(
"POST", f"/data-management/knowledge-sets/{set_id}/items", json=payload
)
async def _update_item(
self, set_id: str, item_id: str, payload: Dict[str, Any]
) -> None:
update_payload = dict(payload)
update_payload.pop("sourceDatasetId", None)
update_payload.pop("sourceFileId", None)
await self._request(
"PUT",
f"/data-management/knowledge-sets/{set_id}/items/{item_id}",
json=update_payload,
)
async def _cleanup_knowledge_set_for_project(self, project_id: str) -> None:
"""清理项目关联的知识集及其所有知识条目"""
items = await self._list_all_knowledge_sets()
for item in items:
if self._metadata_matches_project(item.get("metadata"), project_id):
set_id = item.get("id")
if not set_id:
continue
try:
await self._request(
"DELETE", f"/data-management/knowledge-sets/{set_id}"
)
logger.info(
"已删除知识集:set_id=%s project_id=%s", set_id, project_id
)
except Exception as exc:
logger.warning(
"删除知识集失败:set_id=%s project_id=%s error=%s",
set_id,
project_id,
exc,
)
async def _cleanup_knowledge_item_for_file(
self, dataset_id: str, file_id: str
) -> None:
"""清理文件的知识条目"""
items = await self._list_all_knowledge_sets()
for set_item in items:
set_id = set_item.get("id")
if not set_id:
continue
item = await self._get_item_by_source(set_id, dataset_id, file_id)
if item and item.get("id"):
try:
await self._request(
"DELETE",
f"/data-management/knowledge-sets/{set_id}/items/{item['id']}",
)
logger.info(
"已删除知识条目:item_id=%s set_id=%s dataset_id=%s file_id=%s",
item.get("id"),
set_id,
dataset_id,
file_id,
)
except Exception as exc:
logger.warning(
"删除知识条目失败:item_id=%s set_id=%s dataset_id=%s file_id=%s error=%s",
item.get("id"),
set_id,
dataset_id,
file_id,
exc,
)
async def _build_item_payload(
self,
project: LabelingProject,
file_record: DatasetFiles,
annotation: Dict[str, Any],
annotation_updated_at: Optional[datetime],
) -> Optional[Dict[str, Any]]:
dataset_type = await self._get_dataset_type(project.dataset_id)
annotation_json = self._safe_json_dumps(annotation)
metadata = self._build_item_metadata(
project=project,
file_record=file_record,
annotation=annotation,
annotation_updated_at=annotation_updated_at,
)
title = self._strip_extension(getattr(file_record, "file_name", ""))
if not title:
title = "未命名"
content_type = "TEXT"
if dataset_type == "TEXT" and self._is_markdown_file(file_record):
content_type = "MARKDOWN"
content = annotation_json
payload: Dict[str, Any] = {
"title": title,
"content": content,
"contentType": content_type,
"metadata": metadata,
"sourceDatasetId": project.dataset_id,
"sourceFileId": str(file_record.id),
}
return payload
async def _get_dataset_type(self, dataset_id: str) -> Optional[str]:
result = await self.db.execute(
select(Dataset.dataset_type).where(Dataset.id == dataset_id)
)
dataset_type = result.scalar_one_or_none()
return str(dataset_type).upper() if dataset_type else None
def _is_markdown_file(self, file_record: DatasetFiles) -> bool:
file_name = getattr(file_record, "file_name", "") or ""
file_type = getattr(file_record, "file_type", "") or ""
extension = ""
if "." in file_name:
extension = file_name.rsplit(".", 1)[-1]
elif file_type.startswith("."):
extension = file_type[1:]
else:
extension = file_type
return extension.lower() in {"md", "markdown"}
def _strip_extension(self, file_name: str) -> str:
if not file_name:
return ""
if "." not in file_name:
return file_name
return file_name.rsplit(".", 1)[0]
def _build_set_metadata(self, project: LabelingProject) -> str:
payload = {
"source": "annotation",
"project_id": project.id,
"dataset_id": project.dataset_id,
}
return self._safe_json_dumps(payload)
def _build_item_metadata(
self,
project: LabelingProject,
file_record: DatasetFiles,
annotation: Dict[str, Any],
annotation_updated_at: Optional[datetime],
) -> str:
payload: Dict[str, Any] = {
"source": {
"type": "annotation",
"project_id": project.id,
"dataset_id": project.dataset_id,
"file_id": str(file_record.id),
"file_name": getattr(file_record, "file_name", ""),
},
"annotation": annotation,
}
if annotation_updated_at:
payload["annotation_updated_at"] = annotation_updated_at.isoformat()
return self._safe_json_dumps(payload)
def _build_fallback_set_name(self, base_name: str, project_id: str) -> str:
short_id = project_id.replace("-", "")[:8]
return f"{base_name}-annotation-{short_id}"
async def _update_knowledge_set_config(
self, project: LabelingProject, knowledge_set: Dict[str, Any]
) -> Optional[str]:
result = await self.db.execute(
select(LabelingProject)
.where(LabelingProject.id == project.id)
.with_for_update()
)
locked_project = result.scalar_one_or_none()
if not locked_project:
logger.warning("标注同步失败:无法锁定项目:project_id=%s", project.id)
return None
config = (
locked_project.configuration
if isinstance(locked_project.configuration, dict)
else {}
)
set_id = config.get(self.CONFIG_KEY_SET_ID)
if set_id:
logger.info(
"知识集配置已被其他进程更新:set_id=%s project_id=%s",
set_id,
locked_project.id,
)
return set_id
config.update(
{
self.CONFIG_KEY_SET_ID: knowledge_set.get("id"),
self.CONFIG_KEY_SET_NAME: knowledge_set.get("name"),
}
)
locked_project.configuration = config
await self.db.commit()
await self.db.refresh(locked_project)
return knowledge_set.get("id")
async def _update_project_config(
self, project: LabelingProject, updates: Dict[str, Any]
) -> None:
result = await self.db.execute(
select(LabelingProject)
.where(LabelingProject.id == project.id)
.with_for_update()
)
locked_project = result.scalar_one_or_none()
if not locked_project:
logger.warning("更新项目配置失败:无法锁定项目:project_id=%s", project.id)
return
config = (
locked_project.configuration
if isinstance(locked_project.configuration, dict)
else {}
)
config.update(updates)
locked_project.configuration = config
await self.db.commit()
await self.db.refresh(locked_project)
async def _request(self, method: str, path: str, **kwargs) -> Any:
url = f"{self.base_url}{path}"
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
response = await client.request(method, url, **kwargs)
response.raise_for_status()
if response.content:
payload = response.json()
return self._unwrap_response(payload, response)
return None
def _safe_json_dumps(self, payload: Any) -> str:
try:
return json.dumps(payload, ensure_ascii=False, indent=2)
except Exception:
return json.dumps({"error": "failed to serialize"}, ensure_ascii=False)
def _metadata_matches_project(self, metadata: Any, project_id: str) -> bool:
if not project_id:
return False
parsed = self._parse_metadata(metadata)
if not parsed:
return False
return str(parsed.get("project_id") or "").strip() == project_id
def _parse_metadata(self, metadata: Any) -> Optional[Dict[str, Any]]:
if metadata is None:
return None
if isinstance(metadata, dict):
return metadata
if isinstance(metadata, str):
try:
payload = json.loads(metadata)
except Exception:
return None
return payload if isinstance(payload, dict) else None
return None
def _safe_response_text(self, response: httpx.Response) -> str:
try:
return response.text
except Exception:
return ""
def _unwrap_response(self, payload: Any, response: httpx.Response) -> Any:
if not isinstance(payload, dict):
return payload
if "code" not in payload or "data" not in payload:
return payload
code = str(payload.get("code"))
if code != "0":
raise httpx.HTTPStatusError(
f"响应返回错误 code={code} message={payload.get('message')}",
request=response.request,
response=response,
)
return payload.get("data")