diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java index 4e41020..37bf23a 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java @@ -71,7 +71,13 @@ public class KnowledgeItemApplicationService { KnowledgeItem knowledgeItem = KnowledgeConverter.INSTANCE.convertToKnowledgeItem(request); knowledgeItem.setId(UUID.randomUUID().toString()); knowledgeItem.setSetId(setId); - knowledgeItem.setSourceType(KnowledgeSourceType.MANUAL); + if (StringUtils.isNotBlank(request.getSourceDatasetId()) || StringUtils.isNotBlank(request.getSourceFileId())) { + knowledgeItem.setSourceType(KnowledgeSourceType.DATASET_FILE); + } else { + knowledgeItem.setSourceType(KnowledgeSourceType.MANUAL); + } + knowledgeItem.setSourceDatasetId(request.getSourceDatasetId()); + knowledgeItem.setSourceFileId(request.getSourceFileId()); if (knowledgeItem.getStatus() == null) { knowledgeItem.setStatus(KnowledgeStatusType.DRAFT); } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/KnowledgeItemRepositoryImpl.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/KnowledgeItemRepositoryImpl.java index 68312a4..79fed67 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/KnowledgeItemRepositoryImpl.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/KnowledgeItemRepositoryImpl.java @@ -30,6 +30,8 @@ public class KnowledgeItemRepositoryImpl extends CrudRepository str: - base = settings.datamate_backend_base_url.rstrip("/") - url = f"{base}/data-management/datasets/{dataset_id}/files/{file_id}/download" - - try: - async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: - resp = await client.get(url) - resp.raise_for_status() - - content_length = resp.headers.get("content-length") - if content_length: - try: - if int(content_length) > settings.editor_max_text_bytes: - raise HTTPException( - status_code=413, - detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节", - ) - except ValueError: - # content-length 非法则忽略,走实际长度判断 - pass - - data = resp.content - if len(data) > settings.editor_max_text_bytes: - raise HTTPException( - status_code=413, - detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节", - ) - - # TEXT POC:默认按 UTF-8 解码,不可解码字符用替换符处理 - return data.decode("utf-8", errors="replace") - - except HTTPException: - raise - except httpx.HTTPStatusError as e: - logger.error(f"读取文本失败: dataset={dataset_id}, file={file_id}, http={e.response.status_code}") - raise HTTPException(status_code=502, detail="读取文本失败(下载接口返回错误)") - except Exception as e: - logger.error(f"读取文本失败: dataset={dataset_id}, file={file_id}, err={e}") - raise HTTPException(status_code=502, detail="读取文本失败(下载接口调用异常)") + return await fetch_text_content_via_download_api(dataset_id, file_id) async def get_task( self, @@ -355,13 +318,14 @@ class AnnotationEditorService: project = await self._get_project_or_404(project_id) # 校验文件归属 - file_check = await self.db.execute( - select(DatasetFiles.id).where( + file_result = await self.db.execute( + select(DatasetFiles).where( DatasetFiles.id == file_id, DatasetFiles.dataset_id == project.dataset_id, ) ) - if not file_check.scalar_one_or_none(): + file_record = file_result.scalar_one_or_none() + if not file_record: raise HTTPException(status_code=404, detail=f"文件不存在或不属于该项目: {file_id}") annotation_payload = dict(request.annotation or {}) @@ -406,10 +370,12 @@ class AnnotationEditorService: await self.db.commit() await self.db.refresh(existing) - return UpsertAnnotationResponse( + response = UpsertAnnotationResponse( annotationId=existing.id, updatedAt=existing.updated_at or now, ) + await self._sync_annotation_to_knowledge(project, file_record, final_payload, existing.updated_at) + return response new_id = str(uuid.uuid4()) record = AnnotationResult( @@ -424,10 +390,12 @@ class AnnotationEditorService: await self.db.commit() await self.db.refresh(record) - return UpsertAnnotationResponse( + response = UpsertAnnotationResponse( annotationId=record.id, updatedAt=record.updated_at or now, ) + await self._sync_annotation_to_knowledge(project, file_record, final_payload, record.updated_at) + return response def _merge_segment_annotation( self, @@ -465,3 +433,21 @@ class AnnotationEditorService: return base + async def _sync_annotation_to_knowledge( + self, + project: LabelingProject, + file_record: DatasetFiles, + annotation: Dict[str, Any], + annotation_updated_at: Optional[datetime], + ) -> None: + """同步标注结果到知识管理(失败不影响标注保存)""" + try: + await KnowledgeSyncService(self.db).sync_annotation_to_knowledge( + project=project, + file_record=file_record, + annotation=annotation, + annotation_updated_at=annotation_updated_at, + ) + except Exception as exc: + logger.warning("标注同步知识管理失败:%s", exc) + diff --git a/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py b/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py new file mode 100644 index 0000000..7dd846a --- /dev/null +++ b/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py @@ -0,0 +1,310 @@ +from __future__ import annotations + +import json +from datetime import datetime +from typing import Any, Dict, Optional + +import httpx +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.config import settings +from app.core.logging import get_logger +from app.db.models import Dataset, DatasetFiles, LabelingProject +from app.module.annotation.service.text_fetcher import fetch_text_content_via_download_api + +logger = get_logger(__name__) + + +class KnowledgeSyncService: + """标注保存后同步到知识管理""" + + CONFIG_KEY_SET_ID = "knowledge_set_id" + CONFIG_KEY_SET_NAME = "knowledge_set_name" + + def __init__(self, db: AsyncSession): + self.db = db + self.base_url = settings.datamate_backend_base_url.rstrip("/") + + async def sync_annotation_to_knowledge( + self, + project: LabelingProject, + file_record: DatasetFiles, + annotation: Dict[str, Any], + annotation_updated_at: Optional[datetime], + ) -> None: + """将标注结果同步到知识管理(失败不影响标注保存)""" + if not project or not file_record: + logger.warning("标注同步失败:缺少项目或文件信息") + return + + set_id = await self._ensure_knowledge_set(project) + if not set_id: + logger.warning("标注同步失败:无法获取知识集") + return + + item = await self._get_item_by_source(set_id, project.dataset_id, str(file_record.id)) + if item and item.get("status") in {"PUBLISHED", "ARCHIVED", "DEPRECATED"}: + logger.info( + "知识条目为只读状态,跳过同步:item_id=%s status=%s", + item.get("id"), + item.get("status"), + ) + return + + payload = await self._build_item_payload( + project=project, + file_record=file_record, + annotation=annotation, + annotation_updated_at=annotation_updated_at, + ) + if not payload: + logger.warning("标注同步失败:无法构建知识条目内容") + return + + try: + if item: + await self._update_item(set_id, item["id"], payload) + else: + await self._create_item(set_id, payload) + except Exception as exc: + logger.warning("标注同步到知识管理失败:%s", exc) + + async def _ensure_knowledge_set(self, project: LabelingProject) -> Optional[str]: + config = project.configuration if isinstance(project.configuration, dict) else {} + set_id = config.get(self.CONFIG_KEY_SET_ID) + + if set_id: + exists = await self._get_knowledge_set(set_id) + if exists: + return set_id + logger.warning("知识集不存在,准备重建:set_id=%s", set_id) + + dataset_name = project.name or "annotation-project" + base_name = dataset_name.strip() or "annotation-project" + metadata = self._build_set_metadata(project) + + created = await self._create_knowledge_set(base_name, metadata) + if not created: + fallback_name = self._build_fallback_set_name(base_name, project.id) + created = await self._create_knowledge_set(fallback_name, metadata) + + if not created: + return None + + await self._update_project_config( + project, + { + self.CONFIG_KEY_SET_ID: created.get("id"), + self.CONFIG_KEY_SET_NAME: created.get("name"), + }, + ) + return created.get("id") + + async def _get_knowledge_set(self, set_id: str) -> Optional[Dict[str, Any]]: + try: + return await self._request("GET", f"/data-management/knowledge-sets/{set_id}") + except httpx.HTTPStatusError as exc: + if exc.response.status_code == 404: + return None + raise + + async def _create_knowledge_set(self, name: str, metadata: str) -> Optional[Dict[str, Any]]: + payload = { + "name": name, + "description": "标注项目自动创建的知识集", + "status": "DRAFT", + "metadata": metadata, + } + try: + return await self._request("POST", "/data-management/knowledge-sets", json=payload) + except httpx.HTTPStatusError as exc: + logger.warning( + "创建知识集失败:name=%s status=%s detail=%s", + name, + exc.response.status_code, + self._safe_response_text(exc.response), + ) + return None + + async def _get_item_by_source( + self, + set_id: str, + dataset_id: str, + file_id: str, + ) -> Optional[Dict[str, Any]]: + params = { + "page": 0, + "size": 1, + "sourceDatasetId": dataset_id, + "sourceFileId": file_id, + } + try: + data = await self._request("GET", f"/data-management/knowledge-sets/{set_id}/items", params=params) + except httpx.HTTPStatusError as exc: + logger.warning( + "查询知识条目失败:set_id=%s status=%s", + set_id, + exc.response.status_code, + ) + return None + + if not isinstance(data, dict): + return None + content = data.get("content") or [] + if not content: + return None + return content[0] + + async def _create_item(self, set_id: str, payload: Dict[str, Any]) -> None: + await self._request("POST", f"/data-management/knowledge-sets/{set_id}/items", json=payload) + + async def _update_item(self, set_id: str, item_id: str, payload: Dict[str, Any]) -> None: + update_payload = dict(payload) + update_payload.pop("sourceDatasetId", None) + update_payload.pop("sourceFileId", None) + await self._request( + "PUT", + f"/data-management/knowledge-sets/{set_id}/items/{item_id}", + json=update_payload, + ) + + async def _build_item_payload( + self, + project: LabelingProject, + file_record: DatasetFiles, + annotation: Dict[str, Any], + annotation_updated_at: Optional[datetime], + ) -> Optional[Dict[str, Any]]: + dataset_type = await self._get_dataset_type(project.dataset_id) + annotation_json = self._safe_json_dumps(annotation) + metadata = self._build_item_metadata( + project=project, + file_record=file_record, + annotation=annotation, + annotation_updated_at=annotation_updated_at, + ) + + title = self._strip_extension(getattr(file_record, "file_name", "")) + if not title: + title = "未命名" + + content_type = "TEXT" + if dataset_type == "TEXT" and self._is_markdown_file(file_record): + content_type = "MARKDOWN" + + content = annotation_json + if dataset_type == "TEXT": + try: + content = await fetch_text_content_via_download_api( + project.dataset_id, + str(file_record.id), + ) + content = self._append_annotation_to_content(content, annotation_json, content_type) + except Exception as exc: + logger.warning("读取文本失败,改为仅存标注JSON:%s", exc) + content = annotation_json + + payload: Dict[str, Any] = { + "title": title, + "content": content, + "contentType": content_type, + "metadata": metadata, + "sourceDatasetId": project.dataset_id, + "sourceFileId": str(file_record.id), + } + return payload + + async def _get_dataset_type(self, dataset_id: str) -> Optional[str]: + result = await self.db.execute( + select(Dataset.dataset_type).where(Dataset.id == dataset_id) + ) + dataset_type = result.scalar_one_or_none() + return str(dataset_type).upper() if dataset_type else None + + def _is_markdown_file(self, file_record: DatasetFiles) -> bool: + file_name = getattr(file_record, "file_name", "") or "" + file_type = getattr(file_record, "file_type", "") or "" + extension = "" + if "." in file_name: + extension = file_name.rsplit(".", 1)[-1] + elif file_type.startswith("."): + extension = file_type[1:] + else: + extension = file_type + return extension.lower() in {"md", "markdown"} + + def _append_annotation_to_content(self, content: str, annotation_json: str, content_type: str) -> str: + if content_type == "MARKDOWN": + return ( + f"{content}\n\n---\n\n## 标注结果\n\n```json\n" + f"{annotation_json}\n```") + return f"{content}\n\n---\n\n标注结果(JSON):\n{annotation_json}" + + def _strip_extension(self, file_name: str) -> str: + if not file_name: + return "" + if "." not in file_name: + return file_name + return file_name.rsplit(".", 1)[0] + + def _build_set_metadata(self, project: LabelingProject) -> str: + payload = { + "source": "annotation", + "project_id": project.id, + "dataset_id": project.dataset_id, + } + return self._safe_json_dumps(payload) + + def _build_item_metadata( + self, + project: LabelingProject, + file_record: DatasetFiles, + annotation: Dict[str, Any], + annotation_updated_at: Optional[datetime], + ) -> str: + payload: Dict[str, Any] = { + "source": { + "type": "annotation", + "project_id": project.id, + "dataset_id": project.dataset_id, + "file_id": str(file_record.id), + "file_name": getattr(file_record, "file_name", ""), + }, + "annotation": annotation, + } + if annotation_updated_at: + payload["annotation_updated_at"] = annotation_updated_at.isoformat() + return self._safe_json_dumps(payload) + + def _build_fallback_set_name(self, base_name: str, project_id: str) -> str: + short_id = project_id.replace("-", "")[:8] + return f"{base_name}-annotation-{short_id}" + + async def _update_project_config(self, project: LabelingProject, updates: Dict[str, Any]) -> None: + config = project.configuration if isinstance(project.configuration, dict) else {} + config.update(updates) + project.configuration = config + await self.db.commit() + await self.db.refresh(project) + + async def _request(self, method: str, path: str, **kwargs) -> Any: + url = f"{self.base_url}{path}" + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: + response = await client.request(method, url, **kwargs) + response.raise_for_status() + if response.content: + return response.json() + return None + + def _safe_json_dumps(self, payload: Any) -> str: + try: + return json.dumps(payload, ensure_ascii=False, indent=2) + except Exception: + return json.dumps({"error": "failed to serialize"}, ensure_ascii=False) + + def _safe_response_text(self, response: httpx.Response) -> str: + try: + return response.text + except Exception: + return "" diff --git a/runtime/datamate-python/app/module/annotation/service/text_fetcher.py b/runtime/datamate-python/app/module/annotation/service/text_fetcher.py new file mode 100644 index 0000000..8bea9ca --- /dev/null +++ b/runtime/datamate-python/app/module/annotation/service/text_fetcher.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import httpx +from fastapi import HTTPException + +from app.core.config import settings +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +async def fetch_text_content_via_download_api(dataset_id: str, file_id: str) -> str: + """通过下载接口读取文本内容""" + base = settings.datamate_backend_base_url.rstrip("/") + url = f"{base}/data-management/datasets/{dataset_id}/files/{file_id}/download" + + try: + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: + resp = await client.get(url) + resp.raise_for_status() + + content_length = resp.headers.get("content-length") + if content_length: + try: + if int(content_length) > settings.editor_max_text_bytes: + raise HTTPException( + status_code=413, + detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节", + ) + except ValueError: + # content-length 非法则忽略,走实际长度判断 + pass + + data = resp.content + if len(data) > settings.editor_max_text_bytes: + raise HTTPException( + status_code=413, + detail=f"文本文件过大,限制 {settings.editor_max_text_bytes} 字节", + ) + + # TEXT POC:默认按 UTF-8 解码,不可解码字符用替换符处理 + return data.decode("utf-8", errors="replace") + + except HTTPException: + raise + except httpx.HTTPStatusError as exc: + logger.error( + "读取文本失败: dataset=%s, file=%s, http=%s", + dataset_id, + file_id, + exc.response.status_code, + ) + raise HTTPException(status_code=502, detail="读取文本失败(下载接口返回错误)") + except Exception as exc: + logger.error("读取文本失败: dataset=%s, file=%s, err=%s", dataset_id, file_id, exc) + raise HTTPException(status_code=502, detail="读取文本失败(下载接口调用异常)")