feat(annotation): 实现标注结果同步到知识管理功能

- 在知识条目实体中新增来源数据集ID和文件ID字段
- 实现标注编辑器中同步标注结果到知识管理的服务逻辑
- 添加知识同步服务类处理标注到知识条目的转换和同步
- 实现通过下载接口获取文本内容的独立服务模块
- 更新知识条目查询接口支持按来源数据集和文件ID过滤
- 自动创建和关联标注项目对应的知识集
- 支持文本和Markdown文件的内容合并标注结果
- 添加同步过程中的错误处理和日志记录机制
This commit is contained in:
2026-01-21 16:09:34 +08:00
parent 6baf66b304
commit 73f0ab65fa
7 changed files with 422 additions and 46 deletions

View File

@@ -71,7 +71,13 @@ public class KnowledgeItemApplicationService {
KnowledgeItem knowledgeItem = KnowledgeConverter.INSTANCE.convertToKnowledgeItem(request);
knowledgeItem.setId(UUID.randomUUID().toString());
knowledgeItem.setSetId(setId);
knowledgeItem.setSourceType(KnowledgeSourceType.MANUAL);
if (StringUtils.isNotBlank(request.getSourceDatasetId()) || StringUtils.isNotBlank(request.getSourceFileId())) {
knowledgeItem.setSourceType(KnowledgeSourceType.DATASET_FILE);
} else {
knowledgeItem.setSourceType(KnowledgeSourceType.MANUAL);
}
knowledgeItem.setSourceDatasetId(request.getSourceDatasetId());
knowledgeItem.setSourceFileId(request.getSourceFileId());
if (knowledgeItem.getStatus() == null) {
knowledgeItem.setStatus(KnowledgeStatusType.DRAFT);
}

View File

@@ -30,6 +30,8 @@ public class KnowledgeItemRepositoryImpl extends CrudRepository<KnowledgeItemMap
.eq(StringUtils.isNotBlank(query.getOwner()), KnowledgeItem::getOwner, query.getOwner())
.eq(StringUtils.isNotBlank(query.getSensitivity()), KnowledgeItem::getSensitivity, query.getSensitivity())
.eq(query.getSourceType() != null, KnowledgeItem::getSourceType, query.getSourceType())
.eq(StringUtils.isNotBlank(query.getSourceDatasetId()), KnowledgeItem::getSourceDatasetId, query.getSourceDatasetId())
.eq(StringUtils.isNotBlank(query.getSourceFileId()), KnowledgeItem::getSourceFileId, query.getSourceFileId())
.ge(query.getValidFrom() != null, KnowledgeItem::getValidFrom, query.getValidFrom())
.le(query.getValidTo() != null, KnowledgeItem::getValidTo, query.getValidTo());

View File

@@ -77,4 +77,12 @@ public class CreateKnowledgeItemRequest {
* 扩展元数据
*/
private String metadata;
/**
* 来源数据集ID(用于标注同步等场景)
*/
private String sourceDatasetId;
/**
* 来源文件ID(用于标注同步等场景)
*/
private String sourceFileId;
}

View File

@@ -61,6 +61,14 @@ public class KnowledgeItemPagingQuery extends PagingQuery {
* 来源类型
*/
private KnowledgeSourceType sourceType;
/**
* 来源数据集ID
*/
private String sourceDatasetId;
/**
* 来源文件ID
*/
private String sourceFileId;
/**
* 有效期开始
*/

View File

@@ -14,12 +14,10 @@ from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import hashlib
import httpx
from fastapi import HTTPException
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.db.models import AnnotationResult, Dataset, DatasetFiles, LabelingProject
from app.module.annotation.schema.editor import (
@@ -32,7 +30,9 @@ from app.module.annotation.schema.editor import (
UpsertAnnotationResponse,
)
from app.module.annotation.service.template import AnnotationTemplateService
from app.module.annotation.service.knowledge_sync import KnowledgeSyncService
from app.module.annotation.service.annotation_text_splitter import AnnotationTextSplitter
from app.module.annotation.service.text_fetcher import fetch_text_content_via_download_api
logger = get_logger(__name__)
@@ -172,44 +172,7 @@ class AnnotationEditorService:
)
async def _fetch_text_content_via_download_api(self, dataset_id: str, file_id: str) -> str:
base = settings.datamate_backend_base_url.rstrip("/")
url = f"{base}/data-management/datasets/{dataset_id}/files/{file_id}/download"
try:
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
resp = await client.get(url)
resp.raise_for_status()
content_length = resp.headers.get("content-length")
if content_length:
try:
if int(content_length) > settings.editor_max_text_bytes:
raise HTTPException(
status_code=413,
detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节",
)
except ValueError:
# content-length 非法则忽略,走实际长度判断
pass
data = resp.content
if len(data) > settings.editor_max_text_bytes:
raise HTTPException(
status_code=413,
detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节",
)
# TEXT POC:默认按 UTF-8 解码,不可解码字符用替换符处理
return data.decode("utf-8", errors="replace")
except HTTPException:
raise
except httpx.HTTPStatusError as e:
logger.error(f"读取文本失败: dataset={dataset_id}, file={file_id}, http={e.response.status_code}")
raise HTTPException(status_code=502, detail="读取文本失败(下载接口返回错误)")
except Exception as e:
logger.error(f"读取文本失败: dataset={dataset_id}, file={file_id}, err={e}")
raise HTTPException(status_code=502, detail="读取文本失败(下载接口调用异常)")
return await fetch_text_content_via_download_api(dataset_id, file_id)
async def get_task(
self,
@@ -355,13 +318,14 @@ class AnnotationEditorService:
project = await self._get_project_or_404(project_id)
# 校验文件归属
file_check = await self.db.execute(
select(DatasetFiles.id).where(
file_result = await self.db.execute(
select(DatasetFiles).where(
DatasetFiles.id == file_id,
DatasetFiles.dataset_id == project.dataset_id,
)
)
if not file_check.scalar_one_or_none():
file_record = file_result.scalar_one_or_none()
if not file_record:
raise HTTPException(status_code=404, detail=f"文件不存在或不属于该项目: {file_id}")
annotation_payload = dict(request.annotation or {})
@@ -406,10 +370,12 @@ class AnnotationEditorService:
await self.db.commit()
await self.db.refresh(existing)
return UpsertAnnotationResponse(
response = UpsertAnnotationResponse(
annotationId=existing.id,
updatedAt=existing.updated_at or now,
)
await self._sync_annotation_to_knowledge(project, file_record, final_payload, existing.updated_at)
return response
new_id = str(uuid.uuid4())
record = AnnotationResult(
@@ -424,10 +390,12 @@ class AnnotationEditorService:
await self.db.commit()
await self.db.refresh(record)
return UpsertAnnotationResponse(
response = UpsertAnnotationResponse(
annotationId=record.id,
updatedAt=record.updated_at or now,
)
await self._sync_annotation_to_knowledge(project, file_record, final_payload, record.updated_at)
return response
def _merge_segment_annotation(
self,
@@ -465,3 +433,21 @@ class AnnotationEditorService:
return base
async def _sync_annotation_to_knowledge(
self,
project: LabelingProject,
file_record: DatasetFiles,
annotation: Dict[str, Any],
annotation_updated_at: Optional[datetime],
) -> None:
"""同步标注结果到知识管理(失败不影响标注保存)"""
try:
await KnowledgeSyncService(self.db).sync_annotation_to_knowledge(
project=project,
file_record=file_record,
annotation=annotation,
annotation_updated_at=annotation_updated_at,
)
except Exception as exc:
logger.warning("标注同步知识管理失败:%s", exc)

View File

@@ -0,0 +1,310 @@
from __future__ import annotations
import json
from datetime import datetime
from typing import Any, Dict, Optional
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.db.models import Dataset, DatasetFiles, LabelingProject
from app.module.annotation.service.text_fetcher import fetch_text_content_via_download_api
logger = get_logger(__name__)
class KnowledgeSyncService:
"""标注保存后同步到知识管理"""
CONFIG_KEY_SET_ID = "knowledge_set_id"
CONFIG_KEY_SET_NAME = "knowledge_set_name"
def __init__(self, db: AsyncSession):
self.db = db
self.base_url = settings.datamate_backend_base_url.rstrip("/")
async def sync_annotation_to_knowledge(
self,
project: LabelingProject,
file_record: DatasetFiles,
annotation: Dict[str, Any],
annotation_updated_at: Optional[datetime],
) -> None:
"""将标注结果同步到知识管理(失败不影响标注保存)"""
if not project or not file_record:
logger.warning("标注同步失败:缺少项目或文件信息")
return
set_id = await self._ensure_knowledge_set(project)
if not set_id:
logger.warning("标注同步失败:无法获取知识集")
return
item = await self._get_item_by_source(set_id, project.dataset_id, str(file_record.id))
if item and item.get("status") in {"PUBLISHED", "ARCHIVED", "DEPRECATED"}:
logger.info(
"知识条目为只读状态,跳过同步:item_id=%s status=%s",
item.get("id"),
item.get("status"),
)
return
payload = await self._build_item_payload(
project=project,
file_record=file_record,
annotation=annotation,
annotation_updated_at=annotation_updated_at,
)
if not payload:
logger.warning("标注同步失败:无法构建知识条目内容")
return
try:
if item:
await self._update_item(set_id, item["id"], payload)
else:
await self._create_item(set_id, payload)
except Exception as exc:
logger.warning("标注同步到知识管理失败:%s", exc)
async def _ensure_knowledge_set(self, project: LabelingProject) -> Optional[str]:
config = project.configuration if isinstance(project.configuration, dict) else {}
set_id = config.get(self.CONFIG_KEY_SET_ID)
if set_id:
exists = await self._get_knowledge_set(set_id)
if exists:
return set_id
logger.warning("知识集不存在,准备重建:set_id=%s", set_id)
dataset_name = project.name or "annotation-project"
base_name = dataset_name.strip() or "annotation-project"
metadata = self._build_set_metadata(project)
created = await self._create_knowledge_set(base_name, metadata)
if not created:
fallback_name = self._build_fallback_set_name(base_name, project.id)
created = await self._create_knowledge_set(fallback_name, metadata)
if not created:
return None
await self._update_project_config(
project,
{
self.CONFIG_KEY_SET_ID: created.get("id"),
self.CONFIG_KEY_SET_NAME: created.get("name"),
},
)
return created.get("id")
async def _get_knowledge_set(self, set_id: str) -> Optional[Dict[str, Any]]:
try:
return await self._request("GET", f"/data-management/knowledge-sets/{set_id}")
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 404:
return None
raise
async def _create_knowledge_set(self, name: str, metadata: str) -> Optional[Dict[str, Any]]:
payload = {
"name": name,
"description": "标注项目自动创建的知识集",
"status": "DRAFT",
"metadata": metadata,
}
try:
return await self._request("POST", "/data-management/knowledge-sets", json=payload)
except httpx.HTTPStatusError as exc:
logger.warning(
"创建知识集失败:name=%s status=%s detail=%s",
name,
exc.response.status_code,
self._safe_response_text(exc.response),
)
return None
async def _get_item_by_source(
self,
set_id: str,
dataset_id: str,
file_id: str,
) -> Optional[Dict[str, Any]]:
params = {
"page": 0,
"size": 1,
"sourceDatasetId": dataset_id,
"sourceFileId": file_id,
}
try:
data = await self._request("GET", f"/data-management/knowledge-sets/{set_id}/items", params=params)
except httpx.HTTPStatusError as exc:
logger.warning(
"查询知识条目失败:set_id=%s status=%s",
set_id,
exc.response.status_code,
)
return None
if not isinstance(data, dict):
return None
content = data.get("content") or []
if not content:
return None
return content[0]
async def _create_item(self, set_id: str, payload: Dict[str, Any]) -> None:
await self._request("POST", f"/data-management/knowledge-sets/{set_id}/items", json=payload)
async def _update_item(self, set_id: str, item_id: str, payload: Dict[str, Any]) -> None:
update_payload = dict(payload)
update_payload.pop("sourceDatasetId", None)
update_payload.pop("sourceFileId", None)
await self._request(
"PUT",
f"/data-management/knowledge-sets/{set_id}/items/{item_id}",
json=update_payload,
)
async def _build_item_payload(
self,
project: LabelingProject,
file_record: DatasetFiles,
annotation: Dict[str, Any],
annotation_updated_at: Optional[datetime],
) -> Optional[Dict[str, Any]]:
dataset_type = await self._get_dataset_type(project.dataset_id)
annotation_json = self._safe_json_dumps(annotation)
metadata = self._build_item_metadata(
project=project,
file_record=file_record,
annotation=annotation,
annotation_updated_at=annotation_updated_at,
)
title = self._strip_extension(getattr(file_record, "file_name", ""))
if not title:
title = "未命名"
content_type = "TEXT"
if dataset_type == "TEXT" and self._is_markdown_file(file_record):
content_type = "MARKDOWN"
content = annotation_json
if dataset_type == "TEXT":
try:
content = await fetch_text_content_via_download_api(
project.dataset_id,
str(file_record.id),
)
content = self._append_annotation_to_content(content, annotation_json, content_type)
except Exception as exc:
logger.warning("读取文本失败,改为仅存标注JSON:%s", exc)
content = annotation_json
payload: Dict[str, Any] = {
"title": title,
"content": content,
"contentType": content_type,
"metadata": metadata,
"sourceDatasetId": project.dataset_id,
"sourceFileId": str(file_record.id),
}
return payload
async def _get_dataset_type(self, dataset_id: str) -> Optional[str]:
result = await self.db.execute(
select(Dataset.dataset_type).where(Dataset.id == dataset_id)
)
dataset_type = result.scalar_one_or_none()
return str(dataset_type).upper() if dataset_type else None
def _is_markdown_file(self, file_record: DatasetFiles) -> bool:
file_name = getattr(file_record, "file_name", "") or ""
file_type = getattr(file_record, "file_type", "") or ""
extension = ""
if "." in file_name:
extension = file_name.rsplit(".", 1)[-1]
elif file_type.startswith("."):
extension = file_type[1:]
else:
extension = file_type
return extension.lower() in {"md", "markdown"}
def _append_annotation_to_content(self, content: str, annotation_json: str, content_type: str) -> str:
if content_type == "MARKDOWN":
return (
f"{content}\n\n---\n\n## 标注结果\n\n```json\n"
f"{annotation_json}\n```")
return f"{content}\n\n---\n\n标注结果(JSON):\n{annotation_json}"
def _strip_extension(self, file_name: str) -> str:
if not file_name:
return ""
if "." not in file_name:
return file_name
return file_name.rsplit(".", 1)[0]
def _build_set_metadata(self, project: LabelingProject) -> str:
payload = {
"source": "annotation",
"project_id": project.id,
"dataset_id": project.dataset_id,
}
return self._safe_json_dumps(payload)
def _build_item_metadata(
self,
project: LabelingProject,
file_record: DatasetFiles,
annotation: Dict[str, Any],
annotation_updated_at: Optional[datetime],
) -> str:
payload: Dict[str, Any] = {
"source": {
"type": "annotation",
"project_id": project.id,
"dataset_id": project.dataset_id,
"file_id": str(file_record.id),
"file_name": getattr(file_record, "file_name", ""),
},
"annotation": annotation,
}
if annotation_updated_at:
payload["annotation_updated_at"] = annotation_updated_at.isoformat()
return self._safe_json_dumps(payload)
def _build_fallback_set_name(self, base_name: str, project_id: str) -> str:
short_id = project_id.replace("-", "")[:8]
return f"{base_name}-annotation-{short_id}"
async def _update_project_config(self, project: LabelingProject, updates: Dict[str, Any]) -> None:
config = project.configuration if isinstance(project.configuration, dict) else {}
config.update(updates)
project.configuration = config
await self.db.commit()
await self.db.refresh(project)
async def _request(self, method: str, path: str, **kwargs) -> Any:
url = f"{self.base_url}{path}"
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
response = await client.request(method, url, **kwargs)
response.raise_for_status()
if response.content:
return response.json()
return None
def _safe_json_dumps(self, payload: Any) -> str:
try:
return json.dumps(payload, ensure_ascii=False, indent=2)
except Exception:
return json.dumps({"error": "failed to serialize"}, ensure_ascii=False)
def _safe_response_text(self, response: httpx.Response) -> str:
try:
return response.text
except Exception:
return ""

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import httpx
from fastapi import HTTPException
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
async def fetch_text_content_via_download_api(dataset_id: str, file_id: str) -> str:
"""通过下载接口读取文本内容"""
base = settings.datamate_backend_base_url.rstrip("/")
url = f"{base}/data-management/datasets/{dataset_id}/files/{file_id}/download"
try:
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
resp = await client.get(url)
resp.raise_for_status()
content_length = resp.headers.get("content-length")
if content_length:
try:
if int(content_length) > settings.editor_max_text_bytes:
raise HTTPException(
status_code=413,
detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节",
)
except ValueError:
# content-length 非法则忽略,走实际长度判断
pass
data = resp.content
if len(data) > settings.editor_max_text_bytes:
raise HTTPException(
status_code=413,
detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节",
)
# TEXT POC:默认按 UTF-8 解码,不可解码字符用替换符处理
return data.decode("utf-8", errors="replace")
except HTTPException:
raise
except httpx.HTTPStatusError as exc:
logger.error(
"读取文本失败: dataset=%s, file=%s, http=%s",
dataset_id,
file_id,
exc.response.status_code,
)
raise HTTPException(status_code=502, detail="读取文本失败(下载接口返回错误)")
except Exception as exc:
logger.error("读取文本失败: dataset=%s, file=%s, err=%s", dataset_id, file_id, exc)
raise HTTPException(status_code=502, detail="读取文本失败(下载接口调用异常)")