diff --git a/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py b/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py index 7dd846a..4c904a8 100644 --- a/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py +++ b/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py @@ -21,6 +21,7 @@ class KnowledgeSyncService: CONFIG_KEY_SET_ID = "knowledge_set_id" CONFIG_KEY_SET_NAME = "knowledge_set_name" + KNOWLEDGE_SET_LIST_SIZE = 50 def __init__(self, db: AsyncSession): self.db = db @@ -84,10 +85,30 @@ class KnowledgeSyncService: base_name = dataset_name.strip() or "annotation-project" metadata = self._build_set_metadata(project) + existing = await self._find_knowledge_set_by_name(base_name, project) + if existing: + await self._update_project_config( + project, + { + self.CONFIG_KEY_SET_ID: existing.get("id"), + self.CONFIG_KEY_SET_NAME: existing.get("name"), + }, + ) + return existing.get("id") + created = await self._create_knowledge_set(base_name, metadata) + if not created: + created = await self._find_knowledge_set_by_name(base_name, project) + if not created: fallback_name = self._build_fallback_set_name(base_name, project.id) - created = await self._create_knowledge_set(fallback_name, metadata) + existing = await self._find_knowledge_set_by_name(fallback_name, project) + if existing: + created = existing + else: + created = await self._create_knowledge_set(fallback_name, metadata) + if not created: + created = await self._find_knowledge_set_by_name(fallback_name, project) if not created: return None @@ -109,6 +130,66 @@ class KnowledgeSyncService: return None raise + async def _list_knowledge_sets(self, keyword: Optional[str]) -> list[Dict[str, Any]]: + params: Dict[str, Any] = { + "page": 0, + "size": self.KNOWLEDGE_SET_LIST_SIZE, + } + if keyword: + params["keyword"] = keyword + try: + data = await self._request("GET", "/data-management/knowledge-sets", params=params) + except httpx.HTTPStatusError as exc: + logger.warning( + "查询知识集失败:keyword=%s status=%s", + keyword, + exc.response.status_code, + ) + return [] + if not isinstance(data, dict): + return [] + content = data.get("content") + if not isinstance(content, list): + return [] + return [item for item in content if isinstance(item, dict)] + + def _parse_metadata(self, metadata: Any) -> Optional[Dict[str, Any]]: + if not isinstance(metadata, str) or not metadata.strip(): + return None + try: + parsed = json.loads(metadata) + except Exception: + return None + return parsed if isinstance(parsed, dict) else None + + def _metadata_matches_project(self, metadata: Any, project: LabelingProject) -> bool: + parsed = self._parse_metadata(metadata) + if not parsed: + return False + return ( + parsed.get("source") == "annotation" + and parsed.get("project_id") == project.id + and parsed.get("dataset_id") == project.dataset_id + ) + + async def _find_knowledge_set_by_name( + self, + name: str, + project: LabelingProject, + ) -> Optional[Dict[str, Any]]: + if not name: + return None + items = await self._list_knowledge_sets(name) + if not items: + return None + exact_matches = [item for item in items if item.get("name") == name] + if not exact_matches: + return None + for item in exact_matches: + if self._metadata_matches_project(item.get("metadata"), project): + return item + return exact_matches[0] + async def _create_knowledge_set(self, name: str, metadata: str) -> Optional[Dict[str, Any]]: payload = { "name": name,