diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java index 919e535..08aa9f2 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/KnowledgeItemApplicationService.java @@ -809,8 +809,8 @@ public class KnowledgeItemApplicationService { if (knowledgeItem == null) { return; } - if (knowledgeItem.getSourceType() == KnowledgeSourceType.FILE_UPLOAD - || knowledgeItem.getContentType() == KnowledgeContentType.FILE) { + KnowledgeSourceType sourceType = knowledgeItem.getSourceType(); + if (sourceType == KnowledgeSourceType.FILE_UPLOAD || sourceType == KnowledgeSourceType.MANUAL) { String relativePath = knowledgeItem.getContent(); if (StringUtils.isNotBlank(relativePath)) { try { diff --git a/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py b/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py index 0883f6e..63cfc29 100644 --- a/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py +++ b/runtime/datamate-python/app/module/annotation/service/knowledge_sync.py @@ -73,65 +73,46 @@ class KnowledgeSyncService: logger.warning("标注同步到知识管理失败:%s", exc) async def _ensure_knowledge_set(self, project: LabelingProject) -> Optional[str]: - result = await self.db.execute( - select(LabelingProject) - .where(LabelingProject.id == project.id) - .with_for_update() - ) - locked_project = result.scalar_one_or_none() - if not locked_project: - logger.warning("标注同步失败:无法锁定项目:project_id=%s", project.id) - return None - + # 第一次检查:无锁查询配置 config = ( - locked_project.configuration - if isinstance(locked_project.configuration, dict) - else {} + project.configuration if isinstance(project.configuration, dict) else {} ) set_id = config.get(self.CONFIG_KEY_SET_ID) if set_id: exists = await self._get_knowledge_set(set_id) if exists and self._metadata_matches_project( - exists.get("metadata"), locked_project.id + exists.get("metadata"), project.id ): return set_id logger.warning( "知识集不存在或归属不匹配,准备重建:set_id=%s project_id=%s", set_id, - locked_project.id, + project.id, ) + set_id = None project_name = ( - locked_project.name or "annotation-project" + project.name or "annotation-project" ).strip() or "annotation-project" - metadata = self._build_set_metadata(locked_project) + metadata = self._build_set_metadata(project) existing = await self._find_knowledge_set_by_name_and_project( - project_name, locked_project.id + project_name, project.id ) if existing: - await self._update_project_config( - locked_project, - { - self.CONFIG_KEY_SET_ID: existing.get("id"), - self.CONFIG_KEY_SET_NAME: existing.get("name"), - }, - ) - return existing.get("id") + return await self._update_knowledge_set_config(project, existing) created = await self._create_knowledge_set(project_name, metadata) if not created: created = await self._find_knowledge_set_by_name_and_project( - project_name, locked_project.id + project_name, project.id ) if not created: - fallback_name = self._build_fallback_set_name( - project_name, locked_project.id - ) + fallback_name = self._build_fallback_set_name(project_name, project.id) existing = await self._find_knowledge_set_by_name_and_project( - fallback_name, locked_project.id + fallback_name, project.id ) if existing: created = existing @@ -139,20 +120,13 @@ class KnowledgeSyncService: created = await self._create_knowledge_set(fallback_name, metadata) if not created: created = await self._find_knowledge_set_by_name_and_project( - fallback_name, locked_project.id + fallback_name, project.id ) if not created: return None - await self._update_project_config( - locked_project, - { - self.CONFIG_KEY_SET_ID: created.get("id"), - self.CONFIG_KEY_SET_NAME: created.get("name"), - }, - ) - return created.get("id") + return await self._update_knowledge_set_config(project, created) async def _get_knowledge_set(self, set_id: str) -> Optional[Dict[str, Any]]: try: @@ -165,11 +139,14 @@ class KnowledgeSyncService: raise async def _list_knowledge_sets( - self, keyword: Optional[str] + self, + keyword: Optional[str], + page: Optional[int] = None, + size: Optional[int] = None, ) -> list[Dict[str, Any]]: params: Dict[str, Any] = { - "page": 1, - "size": self.KNOWLEDGE_SET_LIST_SIZE, + "page": page if page is not None else 1, + "size": size if size is not None else self.KNOWLEDGE_SET_LIST_SIZE, } if keyword: params["keyword"] = keyword @@ -191,12 +168,29 @@ class KnowledgeSyncService: return [] return [item for item in content if isinstance(item, dict)] + async def _list_all_knowledge_sets( + self, keyword: Optional[str] = None + ) -> list[Dict[str, Any]]: + page = 1 + all_items: list[Dict[str, Any]] = [] + while True: + items = await self._list_knowledge_sets( + keyword, page=page, size=self.KNOWLEDGE_SET_LIST_SIZE + ) + if not items: + break + all_items.extend(items) + if len(items) < self.KNOWLEDGE_SET_LIST_SIZE: + break + page += 1 + return all_items + async def _find_knowledge_set_by_name_and_project( self, name: str, project_id: str ) -> Optional[Dict[str, Any]]: if not name: return None - items = await self._list_knowledge_sets(name) + items = await self._list_all_knowledge_sets(name) if not items: return None for item in items: @@ -278,7 +272,7 @@ class KnowledgeSyncService: async def _cleanup_knowledge_set_for_project(self, project_id: str) -> None: """清理项目关联的知识集及其所有知识条目""" - items = await self._list_knowledge_sets(None) + items = await self._list_all_knowledge_sets() for item in items: if self._metadata_matches_project(item.get("metadata"), project_id): set_id = item.get("id") @@ -303,7 +297,7 @@ class KnowledgeSyncService: self, dataset_id: str, file_id: str ) -> None: """清理文件的知识条目""" - items = await self._list_knowledge_sets(None) + items = await self._list_all_knowledge_sets() for set_item in items: set_id = set_item.get("id") if not set_id: @@ -427,6 +421,45 @@ class KnowledgeSyncService: short_id = project_id.replace("-", "")[:8] return f"{base_name}-annotation-{short_id}" + async def _update_knowledge_set_config( + self, project: LabelingProject, knowledge_set: Dict[str, Any] + ) -> Optional[str]: + result = await self.db.execute( + select(LabelingProject) + .where(LabelingProject.id == project.id) + .with_for_update() + ) + locked_project = result.scalar_one_or_none() + if not locked_project: + logger.warning("标注同步失败:无法锁定项目:project_id=%s", project.id) + return None + + config = ( + locked_project.configuration + if isinstance(locked_project.configuration, dict) + else {} + ) + set_id = config.get(self.CONFIG_KEY_SET_ID) + + if set_id: + logger.info( + "知识集配置已被其他进程更新:set_id=%s project_id=%s", + set_id, + locked_project.id, + ) + return set_id + + config.update( + { + self.CONFIG_KEY_SET_ID: knowledge_set.get("id"), + self.CONFIG_KEY_SET_NAME: knowledge_set.get("name"), + } + ) + locked_project.configuration = config + await self.db.commit() + await self.db.refresh(locked_project) + return knowledge_set.get("id") + async def _update_project_config( self, project: LabelingProject, updates: Dict[str, Any] ) -> None: