diff --git a/runtime/datamate-python/.env.example b/runtime/datamate-python/.env.example index df9a6a7..bac5120 100644 --- a/runtime/datamate-python/.env.example +++ b/runtime/datamate-python/.env.example @@ -1,94 +1,19 @@ -# ==================================== -# Label Studio Adapter Configuration -# ==================================== - -# ========================= -# 应用程序配置 -# ========================= -APP_NAME="Label Studio Adapter" -APP_VERSION="1.0.0" -APP_DESCRIPTION="Adapter for integrating Data Management System with Label Studio" -DEBUG=true - -# ========================= -# 服务器配置 -# ========================= +# Dev settings HOST=0.0.0.0 PORT=18000 -# ========================= -# 日志配置 -# ========================= -LOG_LEVEL=INFO +DEBUG=true +LOG_LEVEL=DEBUG +LOG_FILE_DIR=./logs -# ========================= -# Label Studio 服务配置 -# ========================= -# Label Studio 服务地址(根据部署方式调整) -# Docker 环境:http://label-studio:8080 -# 本地开发:http://127.0.0.1:8000 -LABEL_STUDIO_BASE_URL=http://label-studio:8080 - -# Label Studio 用户名和密码(用于自动创建用户) -LABEL_STUDIO_USERNAME=admin@example.com -LABEL_STUDIO_PASSWORD=password - -# Label Studio API 认证 Token(Legacy Token,推荐使用) -# 从 Label Studio UI 的 Account & Settings > Access Token 获取 -LABEL_STUDIO_USER_TOKEN=your-label-studio-token-here - -# Label Studio 本地文件存储基础路径(容器内路径,用于 Docker 部署时的权限检查) -LABEL_STUDIO_LOCAL_BASE=/label-studio/local_files - -# Label Studio 本地文件服务路径前缀(任务数据中的文件路径前缀) -LABEL_STUDIO_FILE_PATH_PREFIX=/data/local-files/?d= - -# Label Studio 容器中的本地存储路径(用于配置 Local Storage) -LABEL_STUDIO_LOCAL_STORAGE_DATASET_BASE_PATH=/label-studio/local_files/dataset -LABEL_STUDIO_LOCAL_STORAGE_UPLOAD_BASE_PATH=/label-studio/local_files/upload - -# Label Studio 任务列表分页大小 -LS_TASK_PAGE_SIZE=1000 - -# ========================= -# Data Management 服务配置 -# ========================= -# DM 存储文件夹前缀(通常与 Label Studio 的 local-files 文件夹映射一致) -DM_FILE_PATH_PREFIX=/ - -# ========================= -# Adapter 数据库配置 (MySQL) -# ========================= -# 优先级1:如果配置了 MySQL,将优先使用 MySQL 数据库 -MYSQL_HOST=adapter-db +# DataBase +MYSQL_HOST=localhost MYSQL_PORT=3306 -MYSQL_USER=label_studio_user -MYSQL_PASSWORD=user_password -MYSQL_DATABASE=label_studio_adapter +MYSQL_USER=root +MYSQL_PASSWORD=password +MYSQL_DATABASE=datamate -# ========================= -# CORS 配置 -# ========================= -# 允许的来源(生产环境建议配置具体域名) -ALLOWED_ORIGINS=["*"] +# Label Studio settings +LABEL_STUDIO_BASE_URL=http://localhost:8080 -# 允许的 HTTP 方法 -ALLOWED_METHODS=["*"] - -# 允许的请求头 -ALLOWED_HEADERS=["*"] - -# ========================= -# Docker Compose 配置 -# ========================= -# Docker Compose 项目名称前缀 -COMPOSE_PROJECT_NAME=ls-adapter - -# ========================= -# 同步配置(未来扩展) -# ========================= -# 批量同步任务的批次大小 -SYNC_BATCH_SIZE=100 - -# 同步失败时的最大重试次数 -MAX_RETRIES=3 \ No newline at end of file +LABEL_STUDIO_USER_TOKEN="demo_dev_token" diff --git a/runtime/datamate-python/.gitignore b/runtime/datamate-python/.gitignore index 4670a93..c51b89c 100644 --- a/runtime/datamate-python/.gitignore +++ b/runtime/datamate-python/.gitignore @@ -3,4 +3,6 @@ .dev.env # logs -logs/ \ No newline at end of file +logs/ + +doc/ \ No newline at end of file diff --git a/runtime/datamate-python/app/core/config.py b/runtime/datamate-python/app/core/config.py index 9b8da57..b0ec44d 100644 --- a/runtime/datamate-python/app/core/config.py +++ b/runtime/datamate-python/app/core/config.py @@ -1,8 +1,6 @@ from pydantic_settings import BaseSettings from pydantic import model_validator -from typing import Optional, List -import os -from pathlib import Path +from typing import Optional class Settings(BaseSettings): """应用程序配置""" @@ -10,39 +8,34 @@ class Settings(BaseSettings): class Config: env_file = ".env" case_sensitive = False - extra = 'ignore' # 允许额外字段(如 Shell 脚本专用的环境变量) + extra = 'ignore' - # ========================= - # Adapter 服务配置 - # ========================= - app_name: str = "Label Studio Adapter" + # Service + app_name: str = "DataMate Python Backend" app_version: str = "1.0.0" app_description: str = "Adapter for integrating Data Management System with Label Studio" - # 日志配置 - log_level: str = "INFO" - debug: bool = True - log_file_dir: str = "/var/log/datamate" - - # 服务器配置 host: str = "0.0.0.0" - port: int = 8000 - - # CORS配置 + port: int = 18000 + + # CORS # allowed_origins: List[str] = ["*"] # allowed_methods: List[str] = ["*"] # allowed_headers: List[str] = ["*"] - # MySQL数据库配置 (优先级1) + # Log + log_level: str = "INFO" + debug: bool = True + log_file_dir: str = "/var/log/datamate" + + # Database mysql_host: str = "datamate-database" mysql_port: int = 3306 mysql_user: str = "root" mysql_password: str = "password" mysql_database: str = "datamate" - # 直接数据库URL配置(如果提供,将覆盖上述配置) - # 初始值为空字符串,在 model_validator 中会被设置为完整的 URL - database_url: str = "" + database_url: str = "" # Will be overridden by build_database_url() if not provided @model_validator(mode='after') def build_database_url(self): @@ -55,22 +48,18 @@ class Settings(BaseSettings): return self - # ========================= - # Label Studio 服务配置 - # ========================= + # Label Studio label_studio_base_url: str = "http://label-studio:8000" - label_studio_username: Optional[str] = "admin@demo.com" # Label Studio 用户名(用于登录) - label_studio_password: Optional[str] = "demoadmin" # Label Studio 密码(用于登录) + label_studio_username: Optional[str] = "admin@demo.com" + label_studio_password: Optional[str] = "demoadmin" label_studio_user_token: Optional[str] = "abc123abc123" # Legacy Token - label_studio_local_storage_dataset_base_path: str = "/label-studio/local" # Label Studio容器中的本地存储基础路径 - label_studio_file_path_prefix: str = "/data/local-files/?d=" # Label Studio本地文件服务路径前缀 + label_studio_local_document_root: str = "/label-studio/local" # Label Studio local file storage path + label_studio_file_path_prefix: str = "/data/local-files/?d=" # Label Studio local file serving URL prefix ls_task_page_size: int = 1000 - # ========================= - # Data Management 服务配置 - # ========================= + # DataMate dm_file_path_prefix: str = "/dataset" # DM存储文件夹前缀 # 全局设置实例 diff --git a/runtime/datamate-python/app/db/models/dataset_management.py b/runtime/datamate-python/app/db/models/dataset_management.py index 90a0f58..f0ed159 100644 --- a/runtime/datamate-python/app/db/models/dataset_management.py +++ b/runtime/datamate-python/app/db/models/dataset_management.py @@ -64,6 +64,7 @@ class DatasetFiles(Base): file_size = Column(BigInteger, default=0, comment="文件大小(字节)") check_sum = Column(String(64), nullable=True, comment="文件校验和") tags = Column(JSON, nullable=True, comment="文件标签信息") + tags_updated_at = Column(TIMESTAMP, nullable=True, comment="标签最后更新时间") dataset_filemetadata = Column("metadata", JSON, nullable=True, comment="文件元数据") status = Column(String(50), default='ACTIVE', comment="文件状态:ACTIVE/DELETED/PROCESSING") upload_time = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="上传时间") diff --git a/runtime/datamate-python/app/main.py b/runtime/datamate-python/app/main.py index bebad11..ca96c21 100644 --- a/runtime/datamate-python/app/main.py +++ b/runtime/datamate-python/app/main.py @@ -45,7 +45,7 @@ async def lifespan(app: FastAPI): yield # @shutdown - logger.info("DataMate Python Backend shutting down ...") + logger.info("DataMate Python Backend shutting down ...\n\n") # 创建FastAPI应用 app = FastAPI( @@ -69,11 +69,7 @@ app = FastAPI( app.include_router(router) # 输出注册的路由(每行一个) -logger.debug("Registered routes:") -for route in app.routes: - route_path = getattr(route, "path", None) - if route_path: - logger.debug(f" {route_path}") +logger.debug(f"Registered routes refer to http://localhost:{settings.port}/redoc") # 注册全局异常处理器 app.add_exception_handler(StarletteHTTPException, starlette_http_exception_handler) # type: ignore @@ -102,7 +98,7 @@ async def root(): data={ "message": f"{settings.app_name} is running", "version": settings.app_version, - "docs_url": "/docs", + "docs_url": "/redoc", "label_studio_url": settings.label_studio_base_url } ) diff --git a/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py b/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py index 1bbb2d4..19d31a8 100644 --- a/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py +++ b/runtime/datamate-python/app/module/annotation/client/labelstudio/client.py @@ -380,6 +380,148 @@ class Client: logger.error(f"Error while deleting project {project_id}: {e}") return False + async def get_task_annotations( + self, + task_id: int + ) -> Optional[List[Dict[str, Any]]]: + """获取任务的标注结果 + + Args: + task_id: 任务ID + + Returns: + 标注结果列表,每个标注包含完整的annotation信息 + """ + try: + logger.debug(f"Fetching annotations for task: {task_id}") + + response = await self.client.get(f"/api/tasks/{task_id}/annotations") + response.raise_for_status() + + annotations = response.json() + logger.debug(f"Fetched {len(annotations)} annotations for task {task_id}") + + return annotations + + except httpx.HTTPStatusError as e: + logger.error(f"Get task annotations failed HTTP {e.response.status_code}: {e.response.text}") + return None + except Exception as e: + logger.error(f"Error while getting task annotations: {e}") + return None + + async def create_annotation( + self, + task_id: int, + result: List[Dict[str, Any]], + completed_by: Optional[int] = None + ) -> Optional[Dict[str, Any]]: + """为任务创建新的标注 + + Args: + task_id: 任务ID + result: 标注结果列表 + completed_by: 完成标注的用户ID(可选) + + Returns: + 创建的标注信息,失败返回None + """ + try: + logger.debug(f"Creating annotation for task: {task_id}") + + annotation_data = { + "result": result, + "task": task_id + } + + if completed_by: + annotation_data["completed_by"] = completed_by + + response = await self.client.post( + f"/api/tasks/{task_id}/annotations", + json=annotation_data + ) + response.raise_for_status() + + annotation = response.json() + logger.debug(f"Created annotation {annotation.get('id')} for task {task_id}") + + return annotation + + except httpx.HTTPStatusError as e: + logger.error(f"Create annotation failed HTTP {e.response.status_code}: {e.response.text}") + return None + except Exception as e: + logger.error(f"Error while creating annotation: {e}") + return None + + async def update_annotation( + self, + annotation_id: int, + result: List[Dict[str, Any]] + ) -> Optional[Dict[str, Any]]: + """更新已存在的标注 + + Args: + annotation_id: 标注ID + result: 新的标注结果列表 + + Returns: + 更新后的标注信息,失败返回None + """ + try: + logger.debug(f"Updating annotation: {annotation_id}") + + annotation_data = { + "result": result + } + + response = await self.client.patch( + f"/api/annotations/{annotation_id}", + json=annotation_data + ) + response.raise_for_status() + + annotation = response.json() + logger.debug(f"Updated annotation {annotation_id}") + + return annotation + + except httpx.HTTPStatusError as e: + logger.error(f"Update annotation failed HTTP {e.response.status_code}: {e.response.text}") + return None + except Exception as e: + logger.error(f"Error while updating annotation: {e}") + return None + + async def delete_annotation( + self, + annotation_id: int + ) -> bool: + """删除标注 + + Args: + annotation_id: 标注ID + + Returns: + 成功返回True,失败返回False + """ + try: + logger.debug(f"Deleting annotation: {annotation_id}") + + response = await self.client.delete(f"/api/annotations/{annotation_id}") + response.raise_for_status() + + logger.debug(f"Deleted annotation {annotation_id}") + return True + + except httpx.HTTPStatusError as e: + logger.error(f"Delete annotation failed HTTP {e.response.status_code}: {e.response.text}") + return False + except Exception as e: + logger.error(f"Error while deleting annotation: {e}") + return False + async def create_local_storage( self, project_id: int, diff --git a/runtime/datamate-python/app/module/annotation/interface/project.py b/runtime/datamate-python/app/module/annotation/interface/project.py index cf04497..e9c5c32 100644 --- a/runtime/datamate-python/app/module/annotation/interface/project.py +++ b/runtime/datamate-python/app/module/annotation/interface/project.py @@ -80,7 +80,7 @@ async def create_mapping( project_id = project_data["id"] # 配置本地存储:dataset/ - local_storage_path = f"{settings.label_studio_local_storage_dataset_base_path}/{request.dataset_id}" + local_storage_path = f"{settings.label_studio_local_document_root}/{request.dataset_id}" storage_result = await ls_client.create_local_storage( project_id=project_id, path=local_storage_path, diff --git a/runtime/datamate-python/app/module/annotation/interface/task.py b/runtime/datamate-python/app/module/annotation/interface/task.py index 2f472f5..4158a91 100644 --- a/runtime/datamate-python/app/module/annotation/interface/task.py +++ b/runtime/datamate-python/app/module/annotation/interface/task.py @@ -15,6 +15,8 @@ from ..service.mapping import DatasetMappingService from ..schema import ( SyncDatasetRequest, SyncDatasetResponse, + SyncAnnotationsRequest, + SyncAnnotationsResponse, ) @@ -30,10 +32,24 @@ async def sync_dataset_content( db: AsyncSession = Depends(get_db) ): """ - 同步数据集内容 + 同步数据集内容(包括文件和标注) - 根据指定的mapping ID,同步DM程序数据集中的内容到Label Studio数据集中, - 在数据库中记录更新时间,返回更新状态 + 根据指定的mapping ID,同步DM程序数据集中的内容到Label Studio数据集中。 + 默认同时同步文件和标注数据。 + + Args: + request: 同步请求,包含: + - id: 映射ID(mapping UUID) + - batchSize: 批处理大小 + - filePriority: 文件同步优先级 + - labelPriority: 标签同步优先级 + - syncAnnotations: 是否同步标注(默认True) + - annotationDirection: 标注同步方向(默认bidirectional) + - overwrite: 是否允许覆盖DataMate中的标注(默认True) + - overwriteLabelingProject: 是否允许覆盖Label Studio中的标注(默认True) + + Returns: + 同步结果 """ try: ls_client = LabelStudioClient(base_url=settings.label_studio_base_url, @@ -42,9 +58,9 @@ async def sync_dataset_content( mapping_service = DatasetMappingService(db) sync_service = SyncService(dm_client, ls_client, mapping_service) - logger.info(f"Sync dataset content request: mapping_id={request.id}") + logger.debug(f"Sync dataset content request: mapping_id={request.id}, sync_annotations={request.sync_annotations}") - # request.id 合法性校验 + # request.id validation mapping = await mapping_service.get_mapping_by_uuid(request.id) if not mapping: raise HTTPException( @@ -52,9 +68,34 @@ async def sync_dataset_content( detail=f"Mapping not found: {request.id}" ) - # 执行同步(使用映射中的源数据集UUID) + # Sync dataset files result = await sync_service.sync_dataset_files(request.id, request.batch_size) + # Sync annotations if requested + if request.sync_annotations: + logger.info(f"Syncing annotations: direction={request.annotation_direction}") + + # 根据方向执行标注同步 + if request.annotation_direction == "ls_to_dm": + await sync_service.sync_annotations_from_ls_to_dm( + mapping, + request.batch_size, + request.overwrite + ) + elif request.annotation_direction == "dm_to_ls": + await sync_service.sync_annotations_from_dm_to_ls( + mapping, + request.batch_size, + request.overwrite_labeling_project + ) + elif request.annotation_direction == "bidirectional": + await sync_service.sync_annotations_bidirectional( + mapping, + request.batch_size, + request.overwrite, + request.overwrite_labeling_project + ) + logger.info(f"Sync completed: {result.synced_files}/{result.total_files} files") return StandardResponse( @@ -73,4 +114,148 @@ async def sync_dataset_content( raise HTTPException(status_code=404, detail=str(e)) except Exception as e: logger.error(f"Error syncing dataset content: {e}") - raise HTTPException(status_code=500, detail="Internal server error") \ No newline at end of file + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.post("/annotation/sync", response_model=StandardResponse[SyncAnnotationsResponse]) +async def sync_annotations( + request: SyncAnnotationsRequest, + db: AsyncSession = Depends(get_db) +): + """ + 仅同步标注结果(支持双向同步) + + 根据指定的mapping ID和同步方向,在DM数据集和Label Studio之间同步标注结果。 + 标注结果存储在数据集文件表的tags字段中,使用简化格式。 + + 同步策略: + - 默认为双向同步,基于时间戳自动解决冲突 + - overwrite: 控制是否允许用Label Studio的标注覆盖DataMate(基于时间戳比较) + - overwriteLabelingProject: 控制是否允许用DataMate的标注覆盖Label Studio(基于时间戳比较) + - 如果Label Studio标注的updated_at更新,且overwrite=True,则覆盖DataMate + - 如果DataMate标注的updated_at更新,且overwriteLabelingProject=True,则覆盖Label Studio + + Args: + request: 同步请求,包含: + - id: 映射ID(mapping UUID) + - batchSize: 批处理大小 + - direction: 同步方向 (ls_to_dm/dm_to_ls/bidirectional) + - overwrite: 是否允许覆盖DataMate中的标注(默认True) + - overwriteLabelingProject: 是否允许覆盖Label Studio中的标注(默认True) + + Returns: + 同步结果,包含同步统计信息和冲突解决情况 + """ + try: + ls_client = LabelStudioClient(base_url=settings.label_studio_base_url, + token=settings.label_studio_user_token) + dm_client = DatasetManagementService(db) + mapping_service = DatasetMappingService(db) + sync_service = SyncService(dm_client, ls_client, mapping_service) + + logger.info(f"Sync annotations request: mapping_id={request.id}, direction={request.direction}, overwrite={request.overwrite}, overwrite_ls={request.overwrite_labeling_project}") + + # 验证映射是否存在 + mapping = await mapping_service.get_mapping_by_uuid(request.id) + if not mapping: + raise HTTPException( + status_code=404, + detail=f"Mapping not found: {request.id}" + ) + + # 根据方向执行同步 + if request.direction == "ls_to_dm": + result = await sync_service.sync_annotations_from_ls_to_dm( + mapping, + request.batch_size, + request.overwrite + ) + elif request.direction == "dm_to_ls": + result = await sync_service.sync_annotations_from_dm_to_ls( + mapping, + request.batch_size, + request.overwrite_labeling_project + ) + elif request.direction == "bidirectional": + result = await sync_service.sync_annotations_bidirectional( + mapping, + request.batch_size, + request.overwrite, + request.overwrite_labeling_project + ) + else: + raise HTTPException( + status_code=400, + detail=f"Invalid direction: {request.direction}" + ) + + logger.info(f"Annotation sync completed: synced_to_dm={result.synced_to_dm}, synced_to_ls={result.synced_to_ls}, conflicts_resolved={result.conflicts_resolved}") + + return StandardResponse( + code=200, + message="success", + data=result + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error syncing annotations: {e}") + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + +@router.get("/check-ls-connection") +async def check_label_studio_connection(): + """ + 检查Label Studio连接状态 + + 用于诊断Label Studio连接问题,返回连接状态和配置信息 + """ + try: + ls_client = LabelStudioClient( + base_url=settings.label_studio_base_url, + token=settings.label_studio_user_token + ) + + # 尝试获取项目列表来测试连接 + try: + response = await ls_client.client.get("/api/projects") + response.raise_for_status() + projects = response.json() + + token_display = settings.label_studio_user_token[:10] + "..." if settings.label_studio_user_token else "None" + + return StandardResponse( + code=200, + message="success", + data={ + "status": "connected", + "base_url": settings.label_studio_base_url, + "token": token_display, + "projects_count": len(projects.get("results", [])) if isinstance(projects, dict) else len(projects), + "message": "Successfully connected to Label Studio" + } + ) + except Exception as e: + token_display = settings.label_studio_user_token[:10] + "..." if settings.label_studio_user_token else "None" + + return StandardResponse( + code=500, + message="error", + data={ + "status": "disconnected", + "base_url": settings.label_studio_base_url, + "token": token_display, + "error": str(e), + "message": f"Failed to connect to Label Studio: {str(e)}", + "troubleshooting": [ + "1. Check if Label Studio is running: docker ps | grep label-studio", + "2. Verify LABEL_STUDIO_BASE_URL in .env file", + "3. Verify LABEL_STUDIO_USER_TOKEN is valid", + "4. Check network connectivity between services" + ] + } + ) + except Exception as e: + logger.error(f"Error checking Label Studio connection: {e}") + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/runtime/datamate-python/app/module/annotation/schema/__init__.py b/runtime/datamate-python/app/module/annotation/schema/__init__.py index b7dd02d..03791fa 100644 --- a/runtime/datamate-python/app/module/annotation/schema/__init__.py +++ b/runtime/datamate-python/app/module/annotation/schema/__init__.py @@ -11,6 +11,8 @@ from .mapping import ( from .sync import ( SyncDatasetRequest, SyncDatasetResponse, + SyncAnnotationsRequest, + SyncAnnotationsResponse, ) __all__ = [ @@ -21,5 +23,7 @@ __all__ = [ "DatasetMappingResponse", "SyncDatasetRequest", "SyncDatasetResponse", + "SyncAnnotationsRequest", + "SyncAnnotationsResponse", "DeleteDatasetResponse", ] \ No newline at end of file diff --git a/runtime/datamate-python/app/module/annotation/schema/sync.py b/runtime/datamate-python/app/module/annotation/schema/sync.py index 6604c8c..4babf20 100644 --- a/runtime/datamate-python/app/module/annotation/schema/sync.py +++ b/runtime/datamate-python/app/module/annotation/schema/sync.py @@ -1,4 +1,7 @@ -from pydantic import Field +from typing import Literal, List, Dict, Any, Optional +from datetime import datetime + +from pydantic import Field, ConfigDict from app.module.shared.schema import BaseResponseModel from app.module.shared.schema import StandardResponse @@ -6,8 +9,27 @@ from app.module.shared.schema import StandardResponse class SyncDatasetRequest(BaseResponseModel): """同步数据集请求模型""" + model_config = ConfigDict(populate_by_name=True) + id: str = Field(..., description="映射ID(mapping UUID)") - batch_size: int = Field(50, ge=1, le=100, description="批处理大小") + batch_size: int = Field(50, ge=1, le=100, description="批处理大小", alias="batchSize") + file_priority: Literal[0, 1] = Field(0, description="0 数据集为主,1 标注平台为主", alias="filePriority") + label_priority: Literal[0, 1] = Field(0, description="0 数据集为主,1 标注平台为主", alias="labelPriority") + sync_annotations: bool = Field(True, description="是否同步标注数据", alias="syncAnnotations") + annotation_direction: Literal["ls_to_dm", "dm_to_ls", "bidirectional"] = Field( + "bidirectional", + description="标注同步方向: ls_to_dm(Label Studio到数据集), dm_to_ls(数据集到Label Studio), bidirectional(双向)", + alias="annotationDirection" + ) + overwrite: bool = Field( + True, + description="是否覆盖DataMate中的标注(基于时间戳比较)" + ) + overwrite_labeling_project: bool = Field( + True, + description="是否覆盖Label Studio中的标注(基于时间戳比较)", + alias="overwriteLabelingProject" + ) class SyncDatasetResponse(BaseResponseModel): """同步数据集响应模型""" @@ -18,4 +40,53 @@ class SyncDatasetResponse(BaseResponseModel): message: str = Field(..., description="响应消息") class SyncDatasetResponseStd(StandardResponse[SyncDatasetResponse]): + pass + + +class SyncAnnotationsRequest(BaseResponseModel): + """同步标注请求模型 + + 使用camelCase作为API接口字段名(通过alias),但Python代码内部使用snake_case。 + Pydantic会自动处理两种格式的转换。 + """ + model_config = ConfigDict(populate_by_name=True) + + id: str = Field(..., description="映射ID(mapping UUID)") + batch_size: int = Field(50, ge=1, le=100, description="批处理大小", alias="batchSize") + direction: Literal["ls_to_dm", "dm_to_ls", "bidirectional"] = Field( + "bidirectional", + description="同步方向: ls_to_dm(Label Studio到数据集), dm_to_ls(数据集到Label Studio), bidirectional(双向)" + ) + overwrite: bool = Field( + True, + description="是否覆盖DataMate中的标注(基于时间戳比较)。True时,如果Label Studio的标注更新时间更新,则覆盖DataMate的标注" + ) + overwrite_labeling_project: bool = Field( + True, + description="是否覆盖Label Studio中的标注(基于时间戳比较)。True时,如果DataMate的标注更新时间更新,则覆盖Label Studio的标注", + alias="overwriteLabelingProject" + ) + + +class TagInfo(BaseResponseModel): + """标注信息结构(不包含时间戳,时间戳存储在文件级别的tags_updated_at字段)""" + from_name: str = Field(..., description="标注工具名称") + to_name: str = Field(..., description="目标对象名称") + type: str = Field(..., description="标注类型") + values: Dict[str, Any] = Field(..., description="标注值") + + +class SyncAnnotationsResponse(BaseResponseModel): + """同步标注响应模型""" + id: str = Field(..., description="映射UUID") + status: str = Field(..., description="同步状态: success/partial/error") + synced_to_dm: int = Field(0, description="同步到数据集的标注数量") + synced_to_ls: int = Field(0, description="同步到Label Studio的标注数量") + skipped: int = Field(0, description="跳过的标注数量") + failed: int = Field(0, description="失败的标注数量") + conflicts_resolved: int = Field(0, description="解决的冲突数量") + message: str = Field(..., description="响应消息") + + +class SyncAnnotationsResponseStd(StandardResponse[SyncAnnotationsResponse]): pass \ No newline at end of file diff --git a/runtime/datamate-python/app/module/annotation/service/sync.py b/runtime/datamate-python/app/module/annotation/service/sync.py index dc9e8cc..4a78487 100644 --- a/runtime/datamate-python/app/module/annotation/service/sync.py +++ b/runtime/datamate-python/app/module/annotation/service/sync.py @@ -1,12 +1,18 @@ -from typing import Optional, List, Dict, Any, Tuple +from typing import Optional, List, Dict, Any, Tuple, Set from app.module.dataset import DatasetManagementService +from sqlalchemy import update, select +from app.db.models import DatasetFiles from app.core.logging import get_logger from app.core.config import settings from app.exception import NoDatasetInfoFoundError from ..client import LabelStudioClient -from ..schema import SyncDatasetResponse +from ..schema import ( + SyncDatasetResponse, + DatasetMappingResponse, + SyncAnnotationsResponse +) from ..service.mapping import DatasetMappingService logger = get_logger(__name__) @@ -24,20 +30,74 @@ class SyncService: self.ls_client = ls_client self.mapping_service = mapping_service - def determine_data_type(self, file_type: str) -> str: + def _determine_data_type(self, file_type: str) -> str: """根据文件类型确定数据类型""" file_type_lower = file_type.lower() - if any(ext in file_type_lower for ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp']): - return 'image' - elif any(ext in file_type_lower for ext in ['mp3', 'wav', 'flac', 'aac', 'ogg']): - return 'audio' - elif any(ext in file_type_lower for ext in ['mp4', 'avi', 'mov', 'wmv', 'flv', 'webm']): - return 'video' - elif any(ext in file_type_lower for ext in ['txt', 'doc', 'docx', 'pdf']): - return 'text' - else: - return 'image' # 默认为图像类型 + type_mapping = { + 'image': ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp'], + 'audio': ['mp3', 'wav', 'flac', 'aac', 'ogg'], + 'video': ['mp4', 'avi', 'mov', 'wmv', 'flv', 'webm'], + 'text': ['txt', 'doc', 'docx', 'pdf'], + 'wsi': ['svs', 'tiff', 'ndpi', 'mrxs', 'sdpc'], + 'ct': ['dcm', 'dicom', 'nii', 'nii.gz'] + } + + for data_type, extensions in type_mapping.items(): + if any(ext in file_type_lower for ext in extensions): + return data_type + + return 'image' # 默认为图像类型 + + def _build_task_data(self, file_info: Any, dataset_id: str) -> dict: + """构建Label Studio任务数据""" + data_type = self._determine_data_type(file_info.fileType) + + # 替换文件路径前缀 + file_path = file_info.filePath.removeprefix(settings.dm_file_path_prefix) + file_path = settings.label_studio_file_path_prefix + file_path + + return { + "data": { + f"{data_type}": file_path, + "file_path": file_info.filePath, + "file_id": file_info.id, + "original_name": file_info.originalName, + "dataset_id": dataset_id, + } + } + + async def _create_tasks_with_fallback( + self, + project_id: str, + tasks: List[dict] + ) -> int: + """批量创建任务,失败时回退到单个创建""" + if not tasks: + return 0 + + # 尝试批量创建 + batch_result = await self.ls_client.create_tasks_batch(project_id, tasks) + + if batch_result: + logger.debug(f"Successfully created {len(tasks)} tasks in batch") + return len(tasks) + + # 批量失败,回退到单个创建 + logger.warning(f"Batch creation failed, falling back to single creation") + created_count = 0 + + for task_data in tasks: + task_result = await self.ls_client.create_task( + project_id, + task_data["data"], + task_data.get("meta") + ) + if task_result: + created_count += 1 + + logger.debug(f"Successfully created {created_count}/{len(tasks)} tasks individually") + return created_count async def get_existing_dm_file_mapping(self, project_id: str) -> Dict[str, int]: """ @@ -50,55 +110,126 @@ class SyncService: file_id到task_id的映射字典 """ try: - logger.info(f"Fetching existing task mappings for project {project_id} (page_size={settings.ls_task_page_size})") - dm_file_to_task_mapping = {} - - # 使用Label Studio客户端封装的方法获取所有任务 page_size = getattr(settings, 'ls_task_page_size', 1000) - - # 调用封装好的方法获取所有任务,page=None表示获取全部 result = await self.ls_client.get_project_tasks( project_id=project_id, - page=None, # 不指定page,获取所有任务 + page=None, page_size=page_size ) - logger.info(f"Fetched tasks result: {result}") - if not result: logger.warning(f"Failed to fetch tasks for project {project_id}") return {} - logger.info(f"Successfully fetched tasks for project {project_id}") - all_tasks = result.get("tasks", []) + logger.info(f"Successfully fetched {len(all_tasks)} tasks") - # 遍历所有任务,构建映射 - for task in all_tasks: - # logger.debug(task) - try: - file_id = task.get('data', {}).get('file_id') - task_id = task.get('id') - - dm_file_to_task_mapping[str(file_id)] = task_id - except Exception as e: - logger.error(f"Error processing task {task.get('id')}: {e}") - continue - logger.debug(dm_file_to_task_mapping) + # 使用字典推导式构建映射 + dm_file_to_task_mapping = { + str(task.get('data', {}).get('file_id')): task.get('id') + for task in all_tasks + if task.get('data', {}).get('file_id') is not None + } + logger.info(f"Found {len(dm_file_to_task_mapping)} existing task mappings") return dm_file_to_task_mapping except Exception as e: logger.error(f"Error while fetching existing tasks: {e}") - return {} # 发生错误时返回空字典,会同步所有文件 + return {} + + async def _fetch_dm_files_paginated( + self, + dataset_id: str, + batch_size: int, + existing_file_ids: Set[str], + project_id: str + ) -> Tuple[Set[str], int]: + """ + 分页获取DM文件并创建新任务 + + Returns: + (当前文件ID集合, 创建的任务数) + """ + current_file_ids = set() + total_created = 0 + page = 0 + + while True: + files_response = await self.dm_client.get_dataset_files( + dataset_id, + page=page, + size=batch_size, + ) + + if not files_response or not files_response.content: + logger.info(f"No more files on page {page + 1}") + break + + logger.info(f"Processing page {page + 1}, {len(files_response.content)} files") + + # 筛选新文件并构建任务数据 + new_tasks = [] + for file_info in files_response.content: + file_id = str(file_info.id) + current_file_ids.add(file_id) + + if file_id not in existing_file_ids: + task_data = self._build_task_data(file_info, dataset_id) + new_tasks.append(task_data) + + logger.info(f"Page {page + 1}: {len(new_tasks)} new files, {len(files_response.content) - len(new_tasks)} existing") + + # 批量创建任务 + if new_tasks: + created = await self._create_tasks_with_fallback(project_id, new_tasks) + total_created += created + + # 检查是否还有更多页面 + if page >= files_response.totalPages - 1: + break + page += 1 + + return current_file_ids, total_created + + async def _delete_orphaned_tasks( + self, + existing_dm_file_mapping: Dict[str, int], + current_file_ids: Set[str] + ) -> int: + """删除在DM中不存在的Label Studio任务""" + # 使用集合操作找出需要删除的文件ID + deleted_file_ids = set(existing_dm_file_mapping.keys()) - current_file_ids + + if not deleted_file_ids: + logger.info("No tasks to delete") + return 0 + + tasks_to_delete = [existing_dm_file_mapping[fid] for fid in deleted_file_ids] + logger.info(f"Deleting {len(tasks_to_delete)} orphaned tasks") + + delete_result = await self.ls_client.delete_tasks_batch(tasks_to_delete) + deleted_count = delete_result.get("successful", 0) + + logger.info(f"Successfully deleted {deleted_count} tasks") + return deleted_count async def sync_dataset_files( self, mapping_id: str, batch_size: int = 50 ) -> SyncDatasetResponse: - """同步数据集文件到Label Studio""" - logger.info(f"Start syncing dataset by mapping: {mapping_id}") + """ + 同步数据集文件到Label Studio (Legacy endpoint - 委托给sync_files) + + Args: + mapping_id: 映射ID + batch_size: 批处理大小 + + Returns: + 同步结果响应 + """ + logger.info(f"Start syncing dataset files by mapping: {mapping_id}") # 获取映射关系 mapping = await self.mapping_service.get_mapping_by_uuid(mapping_id) @@ -113,125 +244,17 @@ class SyncService: ) try: - # 获取数据集信息 - dataset_info = await self.dm_client.get_dataset(mapping.dataset_id) - if not dataset_info: - raise NoDatasetInfoFoundError(mapping.dataset_id) + # 委托给sync_files执行实际同步 + result = await self.sync_files(mapping, batch_size) - synced_files = 0 - deleted_tasks = 0 - total_files = dataset_info.fileCount - page = 0 - - logger.info(f"Total files in dataset: {total_files}") - - # 获取Label Studio中已存在的DM文件ID到任务ID的映射 - existing_dm_file_mapping = await self.get_existing_dm_file_mapping(mapping.labeling_project_id) - existing_file_ids = set(existing_dm_file_mapping.keys()) - logger.info(f"{len(existing_file_ids)} tasks already exist in Label Studio") - - # 收集DM中当前存在的所有文件ID - current_file_ids = set() - while True: - files_response = await self.dm_client.get_dataset_files( - mapping.dataset_id, - page=page, - size=batch_size, - ) - - if not files_response or not files_response.content: - logger.info(f"No more files on page {page + 1}") - break - - logger.info(f"Processing page {page + 1}, total {len(files_response.content)} files") - - # 筛选出新文件并批量创建任务 - tasks = [] - new_files_count = 0 - existing_files_count = 0 - - for file_info in files_response.content: - # 记录当前DM中存在的文件ID - current_file_ids.add(str(file_info.id)) - - # 检查文件是否已存在 - if str(file_info.id) in existing_file_ids: - existing_files_count += 1 - logger.debug(f"Skip existing file: {file_info.originalName} (ID: {file_info.id})") - continue - - new_files_count += 1 - - data_type = self.determine_data_type(file_info.fileType) - - # 替换文件路径前缀:只替换开头的前缀,不影响路径中间可能出现的相同字符串 - file_path = file_info.filePath.removeprefix(settings.dm_file_path_prefix) - file_path = settings.label_studio_file_path_prefix + file_path - - # 构造任务数据 - task_data = { - "data": { - f"{data_type}": file_path, - "file_path": file_info.filePath, - "file_id": file_info.id, - "original_name": file_info.originalName, - "dataset_id": mapping.dataset_id, - } - } - tasks.append(task_data) - - logger.info(f"Page {page + 1}: new files {new_files_count}, existing files {existing_files_count}") - - # 批量创建Label Studio任务 - if tasks: - batch_result = await self.ls_client.create_tasks_batch( - mapping.labeling_project_id, - tasks - ) - - if batch_result: - synced_files += len(tasks) - logger.info(f"Successfully synced {len(tasks)} files") - else: - logger.warning(f"Batch task creation failed, fallback to single creation") - # 如果批量创建失败,尝试单个创建 - for task_data in tasks: - task_result = await self.ls_client.create_task( - mapping.labeling_project_id, - task_data["data"], - task_data.get("meta") - ) - if task_result: - synced_files += 1 - - # 检查是否还有更多页面 - if page >= files_response.totalPages - 1: - break - page += 1 - - # 清理在DM中不存在但在Label Studio中存在的任务 - tasks_to_delete = [] - for file_id, task_id in existing_dm_file_mapping.items(): - if file_id not in current_file_ids: - tasks_to_delete.append(task_id) - logger.debug(f"Mark task for deletion: {task_id} (DM file ID: {file_id})") - - if tasks_to_delete: - logger.info(f"Deleting {len(tasks_to_delete)} tasks not present in DM") - delete_result = await self.ls_client.delete_tasks_batch(tasks_to_delete) - deleted_tasks = delete_result.get("successful", 0) - logger.info(f"Successfully deleted {deleted_tasks} tasks") - else: - logger.info("No tasks to delete") - - logger.info(f"Sync completed: total_files={total_files}, created={synced_files}, deleted={deleted_tasks}") + logger.info(f"Sync completed: created={result['created']}, deleted={result['deleted']}, total={result['total']}") return SyncDatasetResponse( id=mapping.id, status="success", - synced_files=synced_files, - total_files=total_files, - message=f"Sync completed: created {synced_files} files, deleted {deleted_tasks} tasks" + synced_files=result["created"], + total_files=result["total"], + message=f"Sync completed: created {result['created']} files, deleted {result['deleted']} tasks" ) except Exception as e: @@ -243,6 +266,706 @@ class SyncService: total_files=0, message=f"Sync failed: {str(e)}" ) + + async def sync_dataset( + self, + mapping_id: str, + batch_size: int = 50, + file_priority: int = 0, + annotation_priority: int = 0 + ) -> SyncDatasetResponse: + """ + 同步数据集文件和标注 + + Args: + mapping_id: 映射ID + batch_size: 批处理大小 + file_priority: 文件同步优先级 (0: dataset优先, 1: annotation优先) + annotation_priority: 标注同步优先级 (0: dataset优先, 1: annotation优先) + + Returns: + 同步结果响应 + """ + logger.info(f"Start syncing dataset by mapping: {mapping_id}") + + # 检查映射是否存在 + mapping = await self.mapping_service.get_mapping_by_uuid(mapping_id) + if not mapping: + logger.error(f"Dataset mapping not found: {mapping_id}") + return SyncDatasetResponse( + id="", + status="error", + synced_files=0, + total_files=0, + message=f"Dataset mapping not found: {mapping_id}" + ) + + try: + # 同步文件 + file_result = await self.sync_files(mapping, batch_size) + + # TODO: 同步标注 + # annotation_result = await self.sync_annotations(mapping, batch_size, annotation_priority) + + logger.info(f"Sync completed: created={file_result['created']}, deleted={file_result['deleted']}, total={file_result['total']}") + + return SyncDatasetResponse( + id=mapping.id, + status="success", + synced_files=file_result["created"], + total_files=file_result["total"], + message=f"Sync completed: created {file_result['created']} files, deleted {file_result['deleted']} tasks" + ) + + except Exception as e: + logger.error(f"Error while syncing dataset: {e}") + return SyncDatasetResponse( + id=mapping.id, + status="error", + synced_files=0, + total_files=0, + message=f"Sync failed: {str(e)}" + ) + + async def sync_files( + self, + mapping: DatasetMappingResponse, + batch_size: int + ) -> Dict[str, int]: + """ + 同步DM和Label Studio之间的文件 + + Args: + mapping: 数据集映射信息 + batch_size: 批处理大小 + + Returns: + 同步统计信息: {"created": int, "deleted": int, "total": int} + """ + logger.info(f"Syncing files for dataset {mapping.dataset_id} to project {mapping.labeling_project_id}") + + # 获取DM数据集信息 + dataset_info = await self.dm_client.get_dataset(mapping.dataset_id) + if not dataset_info: + raise NoDatasetInfoFoundError(mapping.dataset_id) + + total_files = dataset_info.fileCount + logger.info(f"Total files in DM dataset: {total_files}") + + # 获取Label Studio中已存在的文件映射 + existing_dm_file_mapping = await self.get_existing_dm_file_mapping(mapping.labeling_project_id) + existing_file_ids = set(existing_dm_file_mapping.keys()) + logger.info(f"{len(existing_file_ids)} tasks already exist in Label Studio") + + # 分页获取DM文件并创建新任务 + current_file_ids, created_count = await self._fetch_dm_files_paginated( + mapping.dataset_id, + batch_size, + existing_file_ids, + mapping.labeling_project_id + ) + + # 删除孤立任务 + deleted_count = await self._delete_orphaned_tasks( + existing_dm_file_mapping, + current_file_ids + ) + + logger.info(f"File sync completed: total={total_files}, created={created_count}, deleted={deleted_count}") + + return { + "created": created_count, + "deleted": deleted_count, + "total": total_files + } + + async def sync_annotations( + self, + mapping: DatasetMappingResponse, + batch_size: int, + priority: int + ) -> Dict[str, int]: + """ + 同步DM和Label Studio之间的标注 + + Args: + mapping: 数据集映射信息 + batch_size: 批处理大小 + priority: 标注同步优先级 (0: dataset优先, 1: annotation优先) + + Returns: + 同步统计信息: {"synced_to_dm": int, "synced_to_ls": int} + """ + logger.info(f"Syncing annotations for dataset {mapping.dataset_id} (priority={priority})") + + # TODO: 实现标注同步逻辑 + # 1. 从DM获取标注结果 + # 2. 从Label Studio获取标注结果 + # 3. 根据优先级合并结果 + # 4. 将差异写入DM和LS + + logger.info("Annotation sync not yet implemented") + return { + "synced_to_dm": 0, + "synced_to_ls": 0 + } + + def _simplify_annotation_result(self, annotation: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], str]: + """ + 将Label Studio标注结果简化为指定格式 + + Args: + annotation: Label Studio原始标注数据 + + Returns: + Tuple of (简化后的标注结果列表, 标注更新时间ISO字符串) + """ + simplified = [] + + # 获取result字段(包含实际的标注数据) + results = annotation.get("result", []) + + # 获取标注的更新时间,优先使用updated_at,否则使用created_at + updated_at = annotation.get("updated_at") or annotation.get("created_at", "") + + for result_item in results: + simplified_item = { + "from_name": result_item.get("from_name", ""), + "to_name": result_item.get("to_name", ""), + "type": result_item.get("type", ""), + "values": result_item.get("value", {}) + } + simplified.append(simplified_item) + + return simplified, updated_at + + def _compare_timestamps(self, ts1: str, ts2: str) -> int: + """ + 比较两个ISO格式时间戳 + + Args: + ts1: 第一个时间戳 + ts2: 第二个时间戳 + + Returns: + 1 如果 ts1 > ts2 + -1 如果 ts1 < ts2 + 0 如果相等或无法比较 + """ + try: + from dateutil import parser + from datetime import timezone + + dt1 = parser.parse(ts1) + dt2 = parser.parse(ts2) + + # Convert both to UTC timezone-aware if needed + if dt1.tzinfo is None: + dt1 = dt1.replace(tzinfo=timezone.utc) + if dt2.tzinfo is None: + dt2 = dt2.replace(tzinfo=timezone.utc) + + if dt1 > dt2: + return 1 + elif dt1 < dt2: + return -1 + else: + return 0 + except Exception as e: + logger.warning(f"Failed to compare timestamps {ts1} and {ts2}: {e}") + return 0 + + def _should_overwrite_dm(self, ls_updated_at: str, dm_tags_updated_at: Optional[str], overwrite: bool) -> bool: + """ + 判断是否应该用Label Studio的标注覆盖DataMate的标注 + + Args: + ls_updated_at: Label Studio标注的更新时间 + dm_tags_updated_at: DataMate中标注的更新时间(从tags_updated_at字段) + overwrite: 是否允许覆盖 + + Returns: + True 如果应该覆盖,False 如果不应该覆盖 + """ + # 如果不允许覆盖,直接返回False + if not overwrite: + return False + + # 如果DataMate没有标注时间戳,允许覆盖 + if not dm_tags_updated_at: + return True + + # 如果Label Studio的标注更新,允许覆盖 + return self._compare_timestamps(ls_updated_at, dm_tags_updated_at) > 0 + + def _should_overwrite_ls(self, dm_tags_updated_at: Optional[str], ls_updated_at: str, overwrite_ls: bool) -> bool: + """ + 判断是否应该用DataMate的标注覆盖Label Studio的标注 + + Args: + dm_tags_updated_at: DataMate中标注的更新时间(从tags_updated_at字段) + ls_updated_at: Label Studio标注的更新时间 + overwrite_ls: 是否允许覆盖Label Studio + + Returns: + True 如果应该覆盖,False 如果不应该覆盖 + """ + # 如果不允许覆盖,直接返回False + if not overwrite_ls: + return False + + # 如果DataMate没有标注时间戳,不应该覆盖Label Studio + if not dm_tags_updated_at: + return False + + # 如果Label Studio没有标注,应该覆盖 + if not ls_updated_at: + return True + + # 如果DataMate的标注更新,允许覆盖 + return self._compare_timestamps(dm_tags_updated_at, ls_updated_at) > 0 + + async def sync_annotations_from_ls_to_dm( + self, + mapping: DatasetMappingResponse, + batch_size: int = 50, + overwrite: bool = True + ) -> SyncAnnotationsResponse: + """ + 从Label Studio同步标注到数据集 + + Args: + mapping: 数据集映射信息 + batch_size: 批处理大小 + overwrite: 是否允许覆盖DataMate中的标注(基于时间戳比较) + + Returns: + 同步结果响应 + """ + logger.info(f"Syncing annotations from LS to DM: dataset={mapping.dataset_id}, project={mapping.labeling_project_id}") + + synced_count = 0 + skipped_count = 0 + failed_count = 0 + conflicts_resolved = 0 + + try: + # 获取Label Studio中的所有任务 + ls_tasks_result = await self.ls_client.get_project_tasks( + mapping.labeling_project_id, + page=None + ) + + if not ls_tasks_result: + token_display = settings.label_studio_user_token[:10] + "..." if settings.label_studio_user_token else "None" + error_msg = f"Failed to fetch tasks from Label Studio project {mapping.labeling_project_id}. Please check:\n" \ + f"1. Label Studio is running at {settings.label_studio_base_url}\n" \ + f"2. Project ID {mapping.labeling_project_id} exists\n" \ + f"3. API token is valid: {token_display}" + logger.error(error_msg) + return SyncAnnotationsResponse( + id=mapping.id, + status="error", + synced_to_dm=0, + synced_to_ls=0, + skipped=0, + failed=0, + conflicts_resolved=0, + message=f"Failed to connect to Label Studio at {settings.label_studio_base_url}" + ) + + all_tasks = ls_tasks_result.get("tasks", []) + logger.info(f"Found {len(all_tasks)} tasks in Label Studio project") + + if len(all_tasks) == 0: + logger.warning(f"No tasks found in Label Studio project {mapping.labeling_project_id}") + return SyncAnnotationsResponse( + id=mapping.id, + status="success", + synced_to_dm=0, + synced_to_ls=0, + skipped=0, + failed=0, + conflicts_resolved=0, + message="No tasks found in Label Studio project" + ) + + # 批量处理任务 + for i in range(0, len(all_tasks), batch_size): + batch_tasks = all_tasks[i:i + batch_size] + logger.info(f"Processing batch {i // batch_size + 1}, {len(batch_tasks)} tasks") + + for task in batch_tasks: + task_id = task.get("id") + file_id = task.get("data", {}).get("file_id") + + if not file_id: + logger.warning(f"Task {task_id} has no file_id, skipping") + skipped_count += 1 + continue + + # 获取任务的标注结果 + annotations = await self.ls_client.get_task_annotations(task_id) + + if not annotations: + logger.debug(f"No annotations for task {task_id}, skipping") + skipped_count += 1 + continue + + # 简化标注结果(取最新的标注) + latest_annotation = max(annotations, key=lambda a: a.get("updated_at") or a.get("created_at", "")) + simplified_annotations, ls_updated_at = self._simplify_annotation_result(latest_annotation) + + if not simplified_annotations: + logger.debug(f"Task {task_id} has no valid annotation results") + skipped_count += 1 + continue + + # 更新数据库中的tags字段 + try: + # 检查文件是否存在以及是否已有标注 + result = await self.dm_client.db.execute( + select(DatasetFiles).where( + DatasetFiles.id == file_id, + DatasetFiles.dataset_id == mapping.dataset_id + ) + ) + file_record = result.scalar_one_or_none() + + if not file_record: + logger.warning(f"File {file_id} not found in dataset {mapping.dataset_id}") + failed_count += 1 + continue + + # 检查是否应该覆盖DataMate的标注(使用文件级别的tags_updated_at) + dm_tags_updated_at: Optional[str] = None + if file_record.tags_updated_at: # type: ignore + dm_tags_updated_at = file_record.tags_updated_at.isoformat() # type: ignore + + if not self._should_overwrite_dm(ls_updated_at, dm_tags_updated_at, overwrite): + logger.debug(f"File {file_id}: DataMate has newer or equal annotations, skipping (overwrite={overwrite})") + skipped_count += 1 + continue + + # 如果存在冲突(两边都有标注且时间戳不同),记录为冲突解决 + if file_record.tags and ls_updated_at: # type: ignore + conflicts_resolved += 1 + logger.debug(f"File {file_id}: Resolved conflict, Label Studio annotation is newer") + + # 更新tags字段和tags_updated_at + from datetime import datetime + tags_updated_datetime = datetime.fromisoformat(ls_updated_at.replace('Z', '+00:00')) + + await self.dm_client.db.execute( + update(DatasetFiles) + .where(DatasetFiles.id == file_id) + .values( + tags=simplified_annotations, + tags_updated_at=tags_updated_datetime + ) + ) + await self.dm_client.db.commit() + + synced_count += 1 + logger.debug(f"Synced annotations for file {file_id}: {len(simplified_annotations)} results") + + except Exception as e: + logger.error(f"Failed to update annotations for file {file_id}: {e}") + failed_count += 1 + await self.dm_client.db.rollback() + + logger.info(f"Annotation sync completed: synced={synced_count}, skipped={skipped_count}, failed={failed_count}, conflicts_resolved={conflicts_resolved}") + + status = "success" if failed_count == 0 else ("partial" if synced_count > 0 else "error") + + return SyncAnnotationsResponse( + id=mapping.id, + status=status, + synced_to_dm=synced_count, + synced_to_ls=0, + skipped=skipped_count, + failed=failed_count, + conflicts_resolved=conflicts_resolved, + message=f"Synced {synced_count} annotations from Label Studio to dataset. Skipped: {skipped_count}, Failed: {failed_count}, Conflicts resolved: {conflicts_resolved}" + ) + + except Exception as e: + logger.error(f"Error while syncing annotations from LS to DM: {e}") + return SyncAnnotationsResponse( + id=mapping.id, + status="error", + synced_to_dm=synced_count, + synced_to_ls=0, + skipped=skipped_count, + failed=failed_count, + conflicts_resolved=conflicts_resolved, + message=f"Sync failed: {str(e)}" + ) + + async def sync_annotations_from_dm_to_ls( + self, + mapping: DatasetMappingResponse, + batch_size: int = 50, + overwrite_ls: bool = True + ) -> SyncAnnotationsResponse: + """ + 从DataMate数据集同步标注到Label Studio + + Args: + mapping: 数据集映射信息 + batch_size: 批处理大小 + overwrite_ls: 是否允许覆盖Label Studio中的标注(基于时间戳比较) + + Returns: + 同步结果响应 + """ + logger.info(f"Syncing annotations from DM to LS: dataset={mapping.dataset_id}, project={mapping.labeling_project_id}") + + synced_count = 0 + skipped_count = 0 + failed_count = 0 + conflicts_resolved = 0 + + try: + # 获取Label Studio中的文件ID到任务ID的映射 + dm_file_to_task_mapping = await self.get_existing_dm_file_mapping(mapping.labeling_project_id) + + if not dm_file_to_task_mapping: + logger.warning(f"No task mapping found for project {mapping.labeling_project_id}") + return SyncAnnotationsResponse( + id=mapping.id, + status="error", + synced_to_dm=0, + synced_to_ls=0, + skipped=0, + failed=0, + conflicts_resolved=0, + message="No tasks found in Label Studio project" + ) + + logger.info(f"Found {len(dm_file_to_task_mapping)} task mappings") + + # 分页获取DataMate中的文件 + page = 0 + processed_count = 0 + + while True: + files_response = await self.dm_client.get_dataset_files( + mapping.dataset_id, + page=page, + size=batch_size, + ) + + if not files_response or not files_response.content: + logger.info(f"No more files on page {page + 1}") + break + + logger.info(f"Processing page {page + 1}, {len(files_response.content)} files") + + for file_info in files_response.content: + file_id = str(file_info.id) + processed_count += 1 + + # 检查该文件是否在Label Studio中有对应的任务 + task_id = dm_file_to_task_mapping.get(file_id) + if not task_id: + logger.debug(f"File {file_id} has no corresponding task in Label Studio, skipping") + skipped_count += 1 + continue + + # 获取DataMate中的标注 + dm_tags: List[Dict[str, Any]] = file_info.tags if file_info.tags else [] # type: ignore + + if not dm_tags: + logger.debug(f"File {file_id} has no annotations in DataMate, skipping") + skipped_count += 1 + continue + + # 获取DataMate中标注的更新时间 + dm_tags_updated_at: Optional[str] = None + if file_info.tags_updated_at: # type: ignore + dm_tags_updated_at = file_info.tags_updated_at.isoformat() # type: ignore + + try: + # 获取Label Studio中该任务的现有标注 + ls_annotations = await self.ls_client.get_task_annotations(task_id) + + # 获取Label Studio标注的更新时间 + ls_updated_at = "" + if ls_annotations: + latest_ls_annotation = max( + ls_annotations, + key=lambda a: a.get("updated_at") or a.get("created_at", "") + ) + ls_updated_at = latest_ls_annotation.get("updated_at") or latest_ls_annotation.get("created_at", "") + + # 检查是否应该覆盖Label Studio的标注 + if not self._should_overwrite_ls(dm_tags_updated_at, ls_updated_at, overwrite_ls): + logger.debug(f"Task {task_id}: Label Studio has newer or equal annotations, skipping (overwrite_ls={overwrite_ls})") + skipped_count += 1 + continue + + # 如果存在冲突,记录为冲突解决 + if ls_annotations and dm_tags: + conflicts_resolved += 1 + logger.debug(f"Task {task_id}: Resolved conflict, DataMate annotation is newer") + + # 将DataMate的标注转换为Label Studio格式 + ls_result = [] + for tag in dm_tags: + ls_result_item = { + "from_name": tag.get("from_name", ""), + "to_name": tag.get("to_name", ""), + "type": tag.get("type", ""), + "value": tag.get("values", {}) + } + ls_result.append(ls_result_item) + + # 如果Label Studio已有标注,更新它;否则创建新标注 + if ls_annotations: + # 更新最新的标注 + latest_annotation_id = latest_ls_annotation.get("id") + if not latest_annotation_id: + logger.error(f"Task {task_id} has no annotation ID") + failed_count += 1 + continue + + update_result = await self.ls_client.update_annotation( + int(latest_annotation_id), + ls_result + ) + if update_result: + synced_count += 1 + logger.debug(f"Updated annotation for task {task_id}") + else: + failed_count += 1 + logger.error(f"Failed to update annotation for task {task_id}") + else: + # 创建新标注 + create_result = await self.ls_client.create_annotation( + task_id, + ls_result + ) + if create_result: + synced_count += 1 + logger.debug(f"Created annotation for task {task_id}") + else: + failed_count += 1 + logger.error(f"Failed to create annotation for task {task_id}") + + except Exception as e: + logger.error(f"Failed to sync annotations for file {file_id} (task {task_id}): {e}") + failed_count += 1 + + # 检查是否还有更多页面 + if page >= files_response.totalPages - 1: + break + page += 1 + + logger.info(f"Annotation sync completed: synced={synced_count}, skipped={skipped_count}, failed={failed_count}, conflicts_resolved={conflicts_resolved}") + + status = "success" if failed_count == 0 else ("partial" if synced_count > 0 else "error") + + return SyncAnnotationsResponse( + id=mapping.id, + status=status, + synced_to_dm=0, + synced_to_ls=synced_count, + skipped=skipped_count, + failed=failed_count, + conflicts_resolved=conflicts_resolved, + message=f"Synced {synced_count} annotations from DataMate to Label Studio. Skipped: {skipped_count}, Failed: {failed_count}, Conflicts resolved: {conflicts_resolved}" + ) + + except Exception as e: + logger.error(f"Error while syncing annotations from DM to LS: {e}") + return SyncAnnotationsResponse( + id=mapping.id, + status="error", + synced_to_dm=0, + synced_to_ls=synced_count, + skipped=skipped_count, + failed=failed_count, + conflicts_resolved=conflicts_resolved, + message=f"Sync failed: {str(e)}" + ) + + async def sync_annotations_bidirectional( + self, + mapping: DatasetMappingResponse, + batch_size: int = 50, + overwrite: bool = True, + overwrite_ls: bool = True + ) -> SyncAnnotationsResponse: + """ + 双向同步标注结果 + + Args: + mapping: 数据集映射信息 + batch_size: 批处理大小 + overwrite: 是否允许覆盖DataMate中的标注 + overwrite_ls: 是否允许覆盖Label Studio中的标注 + + Returns: + 同步结果响应 + """ + logger.info(f"Bidirectional annotation sync: dataset={mapping.dataset_id}, project={mapping.labeling_project_id}") + + try: + # 先从Label Studio同步到DataMate + ls_to_dm_result = await self.sync_annotations_from_ls_to_dm( + mapping, + batch_size, + overwrite + ) + + # 再从DataMate同步到Label Studio + dm_to_ls_result = await self.sync_annotations_from_dm_to_ls( + mapping, + batch_size, + overwrite_ls + ) + + # 合并结果 + total_synced_to_dm = ls_to_dm_result.synced_to_dm + total_synced_to_ls = dm_to_ls_result.synced_to_ls + total_skipped = ls_to_dm_result.skipped + dm_to_ls_result.skipped + total_failed = ls_to_dm_result.failed + dm_to_ls_result.failed + total_conflicts = ls_to_dm_result.conflicts_resolved + dm_to_ls_result.conflicts_resolved + + # 判断状态 + if ls_to_dm_result.status == "error" and dm_to_ls_result.status == "error": + status = "error" + elif total_failed > 0: + status = "partial" + else: + status = "success" + + logger.info(f"Bidirectional sync completed: to_dm={total_synced_to_dm}, to_ls={total_synced_to_ls}, skipped={total_skipped}, failed={total_failed}, conflicts={total_conflicts}") + + return SyncAnnotationsResponse( + id=mapping.id, + status=status, + synced_to_dm=total_synced_to_dm, + synced_to_ls=total_synced_to_ls, + skipped=total_skipped, + failed=total_failed, + conflicts_resolved=total_conflicts, + message=f"Bidirectional sync completed: {total_synced_to_dm} to DataMate, {total_synced_to_ls} to Label Studio. Skipped: {total_skipped}, Failed: {total_failed}, Conflicts resolved: {total_conflicts}" + ) + + except Exception as e: + logger.error(f"Error during bidirectional sync: {e}") + return SyncAnnotationsResponse( + id=mapping.id, + status="error", + synced_to_dm=0, + synced_to_ls=0, + skipped=0, + failed=0, + conflicts_resolved=0, + message=f"Bidirectional sync failed: {str(e)}" + ) async def get_sync_status( self, diff --git a/runtime/datamate-python/app/module/dataset/schema/dataset_file.py b/runtime/datamate-python/app/module/dataset/schema/dataset_file.py index 95c457f..661a176 100644 --- a/runtime/datamate-python/app/module/dataset/schema/dataset_file.py +++ b/runtime/datamate-python/app/module/dataset/schema/dataset_file.py @@ -15,6 +15,8 @@ class DatasetFileResponse(BaseModel): description: Optional[str] = Field(None, description="文件描述") uploadedBy: Optional[str] = Field(None, description="上传者") lastAccessTime: Optional[datetime] = Field(None, description="最后访问时间") + tags: Optional[List[Dict[str, Any]]] = Field(None, description="文件标签/标注信息") + tags_updated_at: Optional[datetime] = Field(None, description="标签最后更新时间", alias="tagsUpdatedAt") class PagedDatasetFileResponse(BaseModel): """DM服务分页文件响应模型""" diff --git a/runtime/datamate-python/app/module/dataset/service/service.py b/runtime/datamate-python/app/module/dataset/service/service.py index 6acc38e..c861883 100644 --- a/runtime/datamate-python/app/module/dataset/service/service.py +++ b/runtime/datamate-python/app/module/dataset/service/service.py @@ -108,7 +108,9 @@ class Service: uploadedAt=f.upload_time, # type: ignore description=None, uploadedBy=None, - lastAccessTime=f.last_access_time # type: ignore + lastAccessTime=f.last_access_time, # type: ignore + tags=f.tags, # type: ignore + tags_updated_at=f.tags_updated_at # type: ignore ) for f in files ] diff --git a/runtime/datamate-python/requirements.txt b/runtime/datamate-python/requirements.txt index 16aea99..2aec878 100644 Binary files a/runtime/datamate-python/requirements.txt and b/runtime/datamate-python/requirements.txt differ diff --git a/runtime/datamate-python/uvicorn_start.sh b/runtime/datamate-python/uvicorn_start.sh index fc8783c..8a60af5 100755 --- a/runtime/datamate-python/uvicorn_start.sh +++ b/runtime/datamate-python/uvicorn_start.sh @@ -1,3 +1,6 @@ +export LOG_LEVEL=DEBUG +export DEBUG=true + uvicorn app.main:app \ --host 0.0.0.0 \ --port 18000 \ diff --git a/scripts/db/data-management-init.sql b/scripts/db/data-management-init.sql index cf2cff7..a5676c6 100644 --- a/scripts/db/data-management-init.sql +++ b/scripts/db/data-management-init.sql @@ -55,6 +55,7 @@ CREATE TABLE IF NOT EXISTS t_dm_dataset_files ( file_size BIGINT DEFAULT 0 COMMENT '文件大小(字节)', check_sum VARCHAR(64) COMMENT '文件校验和', tags JSON COMMENT '文件标签信息', + tags_updated_at TIMESTAMP NULL COMMENT '标签最后更新时间', metadata JSON COMMENT '文件元数据', status VARCHAR(50) DEFAULT 'ACTIVE' COMMENT '文件状态:ACTIVE/DELETED/PROCESSING', upload_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '上传时间',