feat(auto-annotation): unify annotation results with Label Studio format

Automatically convert auto-annotation outputs to Label Studio format and write to t_dm_annotation_results table, enabling seamless editing in the annotation editor.

New file:
- runtime/python-executor/datamate/annotation_result_converter.py
  * 4 converters for different annotation types:
    - convert_text_classification → choices type
    - convert_ner → labels (span) type
    - convert_relation_extraction → labels + relation type
    - convert_object_detection → rectanglelabels type
  * convert_annotation() dispatcher (auto-detects task_type)
  * generate_label_config_xml() for dynamic XML generation
  * Pipeline introspection utilities
  * Label Studio ID generation logic

Modified file:
- runtime/python-executor/datamate/auto_annotation_worker.py
  * Preserve file_id through processing loop (line 918)
  * Collect file_results as (file_id, annotations) pairs
  * New _create_labeling_project_with_annotations() function:
    - Creates labeling project linked to source dataset
    - Snapshots all files
    - Converts results to Label Studio format
    - Writes to t_dm_annotation_results in single transaction
  * label_config XML stored in t_dm_labeling_projects.configuration

Key features:
- Supports 4 annotation types: text classification, NER, relation extraction, object detection
- Deterministic region IDs for entity references in relation extraction
- Pixel to percentage conversion for object detection
- XML escaping handled by xml.etree.ElementTree
- Partial results preserved on task stop

Users can now view and edit auto-annotation results seamlessly in the annotation editor.
This commit is contained in:
2026-02-10 16:06:40 +08:00
parent 49f99527cc
commit dc490f03be
2 changed files with 667 additions and 16 deletions

View File

@@ -857,6 +857,139 @@ def _register_output_dataset(
)
def _create_labeling_project_with_annotations(
task_id: str,
dataset_id: str,
dataset_name: str,
task_name: str,
dataset_type: str,
normalized_pipeline: List[Dict[str, Any]],
file_results: List[Tuple[str, Dict[str, Any]]],
all_file_ids: List[str],
) -> None:
"""将自动标注结果转换为 Label Studio 格式,创建标注项目并写入标注结果。"""
from datamate.annotation_result_converter import (
convert_annotation,
extract_operator_params,
generate_label_config_xml,
infer_task_type_from_pipeline,
)
task_type = infer_task_type_from_pipeline(normalized_pipeline)
if not task_type:
logger.warning(
"Cannot infer task_type from pipeline for task {}, skipping labeling project creation",
task_id,
)
return
operator_params = extract_operator_params(normalized_pipeline)
# 目标检测:从实际检测结果中收集唯一标签列表
if task_type == "object_detection":
all_labels: set = set()
for _, ann in file_results:
for det in ann.get("detections", []):
if isinstance(det, dict):
all_labels.add(str(det.get("label", "unknown")))
operator_params["_detected_labels"] = sorted(all_labels)
label_config = generate_label_config_xml(task_type, operator_params)
project_id = str(uuid.uuid4())
labeling_project_id = str(uuid.uuid4().int % 10**8).zfill(8)
project_name = f"自动标注 - {task_name or dataset_name or task_id[:8]}"[:100]
now = datetime.now()
configuration = json.dumps(
{
"label_config": label_config,
"description": f"由自动标注任务 {task_id[:8]} 自动创建",
"auto_annotation_task_id": task_id,
},
ensure_ascii=False,
)
insert_project_sql = text(
"""
INSERT INTO t_dm_labeling_projects
(id, dataset_id, name, labeling_project_id, template_id, configuration, created_at, updated_at)
VALUES
(:id, :dataset_id, :name, :labeling_project_id, NULL, :configuration, :now, :now)
"""
)
insert_snapshot_sql = text(
"""
INSERT INTO t_dm_labeling_project_files (id, project_id, file_id, created_at)
VALUES (:id, :project_id, :file_id, :now)
"""
)
insert_annotation_sql = text(
"""
INSERT INTO t_dm_annotation_results
(id, project_id, file_id, annotation, annotation_status, file_version, created_at, updated_at)
VALUES
(:id, :project_id, :file_id, :annotation, :annotation_status, :file_version, :now, :now)
"""
)
with SQLManager.create_connect() as conn:
# 1. 创建标注项目
conn.execute(
insert_project_sql,
{
"id": project_id,
"dataset_id": dataset_id,
"name": project_name,
"labeling_project_id": labeling_project_id,
"configuration": configuration,
"now": now,
},
)
# 2. 创建项目文件快照
for file_id in all_file_ids:
conn.execute(
insert_snapshot_sql,
{
"id": str(uuid.uuid4()),
"project_id": project_id,
"file_id": file_id,
"now": now,
},
)
# 3. 转换并写入标注结果
converted_count = 0
for file_id, annotation in file_results:
ls_annotation = convert_annotation(annotation, file_id, project_id)
if ls_annotation is None:
continue
conn.execute(
insert_annotation_sql,
{
"id": str(uuid.uuid4()),
"project_id": project_id,
"file_id": file_id,
"annotation": json.dumps(ls_annotation, ensure_ascii=False),
"annotation_status": "ANNOTATED",
"file_version": 1,
"now": now,
},
)
converted_count += 1
logger.info(
"Created labeling project {} ({}) with {} annotations for auto-annotation task {}",
project_id,
project_name,
converted_count,
task_id,
)
def _process_single_task(task: Dict[str, Any]) -> None:
"""执行单个自动标注任务。"""
@@ -915,7 +1048,7 @@ def _process_single_task(task: Dict[str, Any]) -> None:
else:
all_files = _load_dataset_files(dataset_id)
files = [(path, name) for _, path, name in all_files]
files = all_files # [(file_id, file_path, file_name)]
total_images = len(files)
if total_images == 0:
@@ -983,10 +1116,11 @@ def _process_single_task(task: Dict[str, Any]) -> None:
processed = 0
detected_total = 0
file_results: List[Tuple[str, Dict[str, Any]]] = [] # (file_id, annotations)
try:
for file_path, file_name in files:
for file_id, file_path, file_name in files:
if _is_stop_requested(task_id, run_token):
logger.info("Task stop requested during processing: {}", task_id)
_update_task_status(
@@ -1003,7 +1137,7 @@ def _process_single_task(task: Dict[str, Any]) -> None:
clear_run_token=True,
error_message="Task stopped by request",
)
return
break
try:
sample_key = _get_sample_key(dataset_type)
@@ -1016,6 +1150,10 @@ def _process_single_task(task: Dict[str, Any]) -> None:
detected_total += _count_detections(result)
processed += 1
ann = result.get("annotations")
if isinstance(ann, dict):
file_results.append((file_id, ann))
progress = int(processed * 100 / total_images) if total_images > 0 else 100
_update_task_status(
@@ -1038,19 +1176,21 @@ def _process_single_task(task: Dict[str, Any]) -> None:
)
continue
_update_task_status(
task_id,
run_token=run_token,
status="completed",
progress=100,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
output_dataset_id=output_dataset_id,
completed=True,
clear_run_token=True,
)
else:
# Loop completed without break (not stopped)
_update_task_status(
task_id,
run_token=run_token,
status="completed",
progress=100,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
output_dataset_id=output_dataset_id,
completed=True,
clear_run_token=True,
)
logger.info(
"Completed auto-annotation task: id={}, total_images={}, processed={}, detected_objects={}, output_path={}",
@@ -1077,6 +1217,26 @@ def _process_single_task(task: Dict[str, Any]) -> None:
task_id,
e,
)
# 将自动标注结果转换为 Label Studio 格式并写入标注项目
if file_results:
try:
_create_labeling_project_with_annotations(
task_id=task_id,
dataset_id=dataset_id,
dataset_name=source_dataset_name,
task_name=task_name,
dataset_type=dataset_type,
normalized_pipeline=normalized_pipeline,
file_results=file_results,
all_file_ids=[fid for fid, _, _ in all_files],
)
except Exception as e: # pragma: no cover - 防御性日志
logger.error(
"Failed to create labeling project for auto-annotation task {}: {}",
task_id,
e,
)
except Exception as e:
logger.error("Task execution failed for task {}: {}", task_id, e)
_update_task_status(