feat: File and Annotation 2-way sync implementation (#63)

* feat: Refactor configuration and sync logic for improved dataset handling and logging

* feat: Enhance annotation synchronization and dataset file management

- Added new fields `tags_updated_at` to `DatasetFiles` model for tracking the last update time of tags.
- Implemented new asynchronous methods in the Label Studio client for fetching, creating, updating, and deleting task annotations.
- Introduced bidirectional synchronization for annotations between DataMate and Label Studio, allowing for flexible data management.
- Updated sync service to handle annotation conflicts based on timestamps, ensuring data integrity during synchronization.
- Enhanced dataset file response model to include tags and their update timestamps.
- Modified database initialization script to create a new column for `tags_updated_at` in the dataset files table.
- Updated requirements to ensure compatibility with the latest dependencies.
This commit is contained in:
Jason Wang
2025-11-07 15:03:07 +08:00
committed by GitHub
parent d136bad38c
commit 78f50ea520
16 changed files with 1336 additions and 290 deletions

View File

@@ -380,6 +380,148 @@ class Client:
logger.error(f"Error while deleting project {project_id}: {e}")
return False
async def get_task_annotations(
self,
task_id: int
) -> Optional[List[Dict[str, Any]]]:
"""获取任务的标注结果
Args:
task_id: 任务ID
Returns:
标注结果列表,每个标注包含完整的annotation信息
"""
try:
logger.debug(f"Fetching annotations for task: {task_id}")
response = await self.client.get(f"/api/tasks/{task_id}/annotations")
response.raise_for_status()
annotations = response.json()
logger.debug(f"Fetched {len(annotations)} annotations for task {task_id}")
return annotations
except httpx.HTTPStatusError as e:
logger.error(f"Get task annotations failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while getting task annotations: {e}")
return None
async def create_annotation(
self,
task_id: int,
result: List[Dict[str, Any]],
completed_by: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""为任务创建新的标注
Args:
task_id: 任务ID
result: 标注结果列表
completed_by: 完成标注的用户ID(可选)
Returns:
创建的标注信息,失败返回None
"""
try:
logger.debug(f"Creating annotation for task: {task_id}")
annotation_data = {
"result": result,
"task": task_id
}
if completed_by:
annotation_data["completed_by"] = completed_by
response = await self.client.post(
f"/api/tasks/{task_id}/annotations",
json=annotation_data
)
response.raise_for_status()
annotation = response.json()
logger.debug(f"Created annotation {annotation.get('id')} for task {task_id}")
return annotation
except httpx.HTTPStatusError as e:
logger.error(f"Create annotation failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while creating annotation: {e}")
return None
async def update_annotation(
self,
annotation_id: int,
result: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""更新已存在的标注
Args:
annotation_id: 标注ID
result: 新的标注结果列表
Returns:
更新后的标注信息,失败返回None
"""
try:
logger.debug(f"Updating annotation: {annotation_id}")
annotation_data = {
"result": result
}
response = await self.client.patch(
f"/api/annotations/{annotation_id}",
json=annotation_data
)
response.raise_for_status()
annotation = response.json()
logger.debug(f"Updated annotation {annotation_id}")
return annotation
except httpx.HTTPStatusError as e:
logger.error(f"Update annotation failed HTTP {e.response.status_code}: {e.response.text}")
return None
except Exception as e:
logger.error(f"Error while updating annotation: {e}")
return None
async def delete_annotation(
self,
annotation_id: int
) -> bool:
"""删除标注
Args:
annotation_id: 标注ID
Returns:
成功返回True,失败返回False
"""
try:
logger.debug(f"Deleting annotation: {annotation_id}")
response = await self.client.delete(f"/api/annotations/{annotation_id}")
response.raise_for_status()
logger.debug(f"Deleted annotation {annotation_id}")
return True
except httpx.HTTPStatusError as e:
logger.error(f"Delete annotation failed HTTP {e.response.status_code}: {e.response.text}")
return False
except Exception as e:
logger.error(f"Error while deleting annotation: {e}")
return False
async def create_local_storage(
self,
project_id: int,

View File

@@ -80,7 +80,7 @@ async def create_mapping(
project_id = project_data["id"]
# 配置本地存储:dataset/<id>
local_storage_path = f"{settings.label_studio_local_storage_dataset_base_path}/{request.dataset_id}"
local_storage_path = f"{settings.label_studio_local_document_root}/{request.dataset_id}"
storage_result = await ls_client.create_local_storage(
project_id=project_id,
path=local_storage_path,

View File

@@ -15,6 +15,8 @@ from ..service.mapping import DatasetMappingService
from ..schema import (
SyncDatasetRequest,
SyncDatasetResponse,
SyncAnnotationsRequest,
SyncAnnotationsResponse,
)
@@ -30,10 +32,24 @@ async def sync_dataset_content(
db: AsyncSession = Depends(get_db)
):
"""
同步数据集内容
同步数据集内容(包括文件和标注)
根据指定的mapping ID,同步DM程序数据集中的内容到Label Studio数据集中
在数据库中记录更新时间,返回更新状态
根据指定的mapping ID,同步DM程序数据集中的内容到Label Studio数据集中
默认同时同步文件和标注数据。
Args:
request: 同步请求,包含:
- id: 映射ID(mapping UUID)
- batchSize: 批处理大小
- filePriority: 文件同步优先级
- labelPriority: 标签同步优先级
- syncAnnotations: 是否同步标注(默认True)
- annotationDirection: 标注同步方向(默认bidirectional)
- overwrite: 是否允许覆盖DataMate中的标注(默认True)
- overwriteLabelingProject: 是否允许覆盖Label Studio中的标注(默认True)
Returns:
同步结果
"""
try:
ls_client = LabelStudioClient(base_url=settings.label_studio_base_url,
@@ -42,9 +58,9 @@ async def sync_dataset_content(
mapping_service = DatasetMappingService(db)
sync_service = SyncService(dm_client, ls_client, mapping_service)
logger.info(f"Sync dataset content request: mapping_id={request.id}")
logger.debug(f"Sync dataset content request: mapping_id={request.id}, sync_annotations={request.sync_annotations}")
# request.id 合法性校验
# request.id validation
mapping = await mapping_service.get_mapping_by_uuid(request.id)
if not mapping:
raise HTTPException(
@@ -52,9 +68,34 @@ async def sync_dataset_content(
detail=f"Mapping not found: {request.id}"
)
# 执行同步(使用映射中的源数据集UUID)
# Sync dataset files
result = await sync_service.sync_dataset_files(request.id, request.batch_size)
# Sync annotations if requested
if request.sync_annotations:
logger.info(f"Syncing annotations: direction={request.annotation_direction}")
# 根据方向执行标注同步
if request.annotation_direction == "ls_to_dm":
await sync_service.sync_annotations_from_ls_to_dm(
mapping,
request.batch_size,
request.overwrite
)
elif request.annotation_direction == "dm_to_ls":
await sync_service.sync_annotations_from_dm_to_ls(
mapping,
request.batch_size,
request.overwrite_labeling_project
)
elif request.annotation_direction == "bidirectional":
await sync_service.sync_annotations_bidirectional(
mapping,
request.batch_size,
request.overwrite,
request.overwrite_labeling_project
)
logger.info(f"Sync completed: {result.synced_files}/{result.total_files} files")
return StandardResponse(
@@ -73,4 +114,148 @@ async def sync_dataset_content(
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Error syncing dataset content: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/annotation/sync", response_model=StandardResponse[SyncAnnotationsResponse])
async def sync_annotations(
request: SyncAnnotationsRequest,
db: AsyncSession = Depends(get_db)
):
"""
仅同步标注结果(支持双向同步)
根据指定的mapping ID和同步方向,在DM数据集和Label Studio之间同步标注结果。
标注结果存储在数据集文件表的tags字段中,使用简化格式。
同步策略:
- 默认为双向同步,基于时间戳自动解决冲突
- overwrite: 控制是否允许用Label Studio的标注覆盖DataMate(基于时间戳比较)
- overwriteLabelingProject: 控制是否允许用DataMate的标注覆盖Label Studio(基于时间戳比较)
- 如果Label Studio标注的updated_at更新,且overwrite=True,则覆盖DataMate
- 如果DataMate标注的updated_at更新,且overwriteLabelingProject=True,则覆盖Label Studio
Args:
request: 同步请求,包含:
- id: 映射ID(mapping UUID)
- batchSize: 批处理大小
- direction: 同步方向 (ls_to_dm/dm_to_ls/bidirectional)
- overwrite: 是否允许覆盖DataMate中的标注(默认True)
- overwriteLabelingProject: 是否允许覆盖Label Studio中的标注(默认True)
Returns:
同步结果,包含同步统计信息和冲突解决情况
"""
try:
ls_client = LabelStudioClient(base_url=settings.label_studio_base_url,
token=settings.label_studio_user_token)
dm_client = DatasetManagementService(db)
mapping_service = DatasetMappingService(db)
sync_service = SyncService(dm_client, ls_client, mapping_service)
logger.info(f"Sync annotations request: mapping_id={request.id}, direction={request.direction}, overwrite={request.overwrite}, overwrite_ls={request.overwrite_labeling_project}")
# 验证映射是否存在
mapping = await mapping_service.get_mapping_by_uuid(request.id)
if not mapping:
raise HTTPException(
status_code=404,
detail=f"Mapping not found: {request.id}"
)
# 根据方向执行同步
if request.direction == "ls_to_dm":
result = await sync_service.sync_annotations_from_ls_to_dm(
mapping,
request.batch_size,
request.overwrite
)
elif request.direction == "dm_to_ls":
result = await sync_service.sync_annotations_from_dm_to_ls(
mapping,
request.batch_size,
request.overwrite_labeling_project
)
elif request.direction == "bidirectional":
result = await sync_service.sync_annotations_bidirectional(
mapping,
request.batch_size,
request.overwrite,
request.overwrite_labeling_project
)
else:
raise HTTPException(
status_code=400,
detail=f"Invalid direction: {request.direction}"
)
logger.info(f"Annotation sync completed: synced_to_dm={result.synced_to_dm}, synced_to_ls={result.synced_to_ls}, conflicts_resolved={result.conflicts_resolved}")
return StandardResponse(
code=200,
message="success",
data=result
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error syncing annotations: {e}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@router.get("/check-ls-connection")
async def check_label_studio_connection():
"""
检查Label Studio连接状态
用于诊断Label Studio连接问题,返回连接状态和配置信息
"""
try:
ls_client = LabelStudioClient(
base_url=settings.label_studio_base_url,
token=settings.label_studio_user_token
)
# 尝试获取项目列表来测试连接
try:
response = await ls_client.client.get("/api/projects")
response.raise_for_status()
projects = response.json()
token_display = settings.label_studio_user_token[:10] + "..." if settings.label_studio_user_token else "None"
return StandardResponse(
code=200,
message="success",
data={
"status": "connected",
"base_url": settings.label_studio_base_url,
"token": token_display,
"projects_count": len(projects.get("results", [])) if isinstance(projects, dict) else len(projects),
"message": "Successfully connected to Label Studio"
}
)
except Exception as e:
token_display = settings.label_studio_user_token[:10] + "..." if settings.label_studio_user_token else "None"
return StandardResponse(
code=500,
message="error",
data={
"status": "disconnected",
"base_url": settings.label_studio_base_url,
"token": token_display,
"error": str(e),
"message": f"Failed to connect to Label Studio: {str(e)}",
"troubleshooting": [
"1. Check if Label Studio is running: docker ps | grep label-studio",
"2. Verify LABEL_STUDIO_BASE_URL in .env file",
"3. Verify LABEL_STUDIO_USER_TOKEN is valid",
"4. Check network connectivity between services"
]
}
)
except Exception as e:
logger.error(f"Error checking Label Studio connection: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -11,6 +11,8 @@ from .mapping import (
from .sync import (
SyncDatasetRequest,
SyncDatasetResponse,
SyncAnnotationsRequest,
SyncAnnotationsResponse,
)
__all__ = [
@@ -21,5 +23,7 @@ __all__ = [
"DatasetMappingResponse",
"SyncDatasetRequest",
"SyncDatasetResponse",
"SyncAnnotationsRequest",
"SyncAnnotationsResponse",
"DeleteDatasetResponse",
]

View File

@@ -1,4 +1,7 @@
from pydantic import Field
from typing import Literal, List, Dict, Any, Optional
from datetime import datetime
from pydantic import Field, ConfigDict
from app.module.shared.schema import BaseResponseModel
from app.module.shared.schema import StandardResponse
@@ -6,8 +9,27 @@ from app.module.shared.schema import StandardResponse
class SyncDatasetRequest(BaseResponseModel):
"""同步数据集请求模型"""
model_config = ConfigDict(populate_by_name=True)
id: str = Field(..., description="映射ID(mapping UUID)")
batch_size: int = Field(50, ge=1, le=100, description="批处理大小")
batch_size: int = Field(50, ge=1, le=100, description="批处理大小", alias="batchSize")
file_priority: Literal[0, 1] = Field(0, description="0 数据集为主,1 标注平台为主", alias="filePriority")
label_priority: Literal[0, 1] = Field(0, description="0 数据集为主,1 标注平台为主", alias="labelPriority")
sync_annotations: bool = Field(True, description="是否同步标注数据", alias="syncAnnotations")
annotation_direction: Literal["ls_to_dm", "dm_to_ls", "bidirectional"] = Field(
"bidirectional",
description="标注同步方向: ls_to_dm(Label Studio到数据集), dm_to_ls(数据集到Label Studio), bidirectional(双向)",
alias="annotationDirection"
)
overwrite: bool = Field(
True,
description="是否覆盖DataMate中的标注(基于时间戳比较)"
)
overwrite_labeling_project: bool = Field(
True,
description="是否覆盖Label Studio中的标注(基于时间戳比较)",
alias="overwriteLabelingProject"
)
class SyncDatasetResponse(BaseResponseModel):
"""同步数据集响应模型"""
@@ -18,4 +40,53 @@ class SyncDatasetResponse(BaseResponseModel):
message: str = Field(..., description="响应消息")
class SyncDatasetResponseStd(StandardResponse[SyncDatasetResponse]):
pass
class SyncAnnotationsRequest(BaseResponseModel):
"""同步标注请求模型
使用camelCase作为API接口字段名(通过alias),但Python代码内部使用snake_case。
Pydantic会自动处理两种格式的转换。
"""
model_config = ConfigDict(populate_by_name=True)
id: str = Field(..., description="映射ID(mapping UUID)")
batch_size: int = Field(50, ge=1, le=100, description="批处理大小", alias="batchSize")
direction: Literal["ls_to_dm", "dm_to_ls", "bidirectional"] = Field(
"bidirectional",
description="同步方向: ls_to_dm(Label Studio到数据集), dm_to_ls(数据集到Label Studio), bidirectional(双向)"
)
overwrite: bool = Field(
True,
description="是否覆盖DataMate中的标注(基于时间戳比较)。True时,如果Label Studio的标注更新时间更新,则覆盖DataMate的标注"
)
overwrite_labeling_project: bool = Field(
True,
description="是否覆盖Label Studio中的标注(基于时间戳比较)。True时,如果DataMate的标注更新时间更新,则覆盖Label Studio的标注",
alias="overwriteLabelingProject"
)
class TagInfo(BaseResponseModel):
"""标注信息结构(不包含时间戳,时间戳存储在文件级别的tags_updated_at字段)"""
from_name: str = Field(..., description="标注工具名称")
to_name: str = Field(..., description="目标对象名称")
type: str = Field(..., description="标注类型")
values: Dict[str, Any] = Field(..., description="标注值")
class SyncAnnotationsResponse(BaseResponseModel):
"""同步标注响应模型"""
id: str = Field(..., description="映射UUID")
status: str = Field(..., description="同步状态: success/partial/error")
synced_to_dm: int = Field(0, description="同步到数据集的标注数量")
synced_to_ls: int = Field(0, description="同步到Label Studio的标注数量")
skipped: int = Field(0, description="跳过的标注数量")
failed: int = Field(0, description="失败的标注数量")
conflicts_resolved: int = Field(0, description="解决的冲突数量")
message: str = Field(..., description="响应消息")
class SyncAnnotationsResponseStd(StandardResponse[SyncAnnotationsResponse]):
pass

File diff suppressed because it is too large Load Diff