feat(annotation): 支持通用算子编排的数据标注功能

## 功能概述
将数据标注模块从固定 YOLO 算子改造为支持通用算子编排,实现与数据清洗模块类似的灵活算子组合能力。

## 改动内容

### 第 1 步:数据库改造(DDL)
- 新增 SQL migration 脚本:scripts/db/data-annotation-operator-pipeline-migration.sql
- 修改 t_dm_auto_annotation_tasks 表:
  - 新增字段:task_mode, executor_type, pipeline, output_dataset_id, created_by, stop_requested, started_at, heartbeat_at, run_token
  - 新增索引:idx_status_created, idx_created_by
- 创建 t_dm_annotation_task_operator_instance 表:用于存储算子实例详情

### 第 2 步:API 层改造
- 扩展请求模型(schema/auto.py):
  - 新增 OperatorPipelineStep 模型
  - 支持 pipeline 字段,保留旧 YOLO 字段向后兼容
  - 实现多写法归一(operatorId/operator_id/id, overrides/settingsOverride/settings_override)
- 修改任务创建服务(service/auto.py):
  - 新增 validate_file_ids() 校验方法
  - 新增 _to_pipeline() 兼容映射方法
  - 写入新字段并集成算子实例表
  - 修复 fileIds 去重准确性问题
- 新增 API 路由(interface/auto.py):
  - 新增 /operator-tasks 系列接口
  - 新增 stop API 接口(/auto/{id}/stop 和 /operator-tasks/{id}/stop)
  - 保留旧 /auto 接口向后兼容
- ORM 模型对齐(annotation_management.py):
  - AutoAnnotationTask 新增所有 DDL 字段
  - 新增 AnnotationTaskOperatorInstance 模型
  - 状态定义补充 stopped

### 第 3 步:Runtime 层改造
- 修改 worker 执行逻辑(auto_annotation_worker.py):
  - 实现原子任务抢占机制(run_token)
  - 从硬编码 YOLO 改为通用 pipeline 执行
  - 新增算子解析和实例化能力
  - 支持 stop_requested 检查
  - 保留 legacy_yolo 模式向后兼容
  - 支持多种算子调用方式(execute 和 __call__)

### 第 4 步:灰度发布
- 完善 YOLO 算子元数据(metadata.yml):
  - 补齐 raw_id, language, modal, inputs, outputs, settings 字段
- 注册标注算子(__init__.py):
  - 将 YOLO 算子注册到 OPERATORS 注册表
  - 确保 annotation 包被正确加载
- 新增白名单控制:
  - 支持环境变量 AUTO_ANNOTATION_OPERATOR_WHITELIST
  - 灰度发布时可限制可用算子

## 关键特性

### 向后兼容
- 旧 /auto 接口完全保留
- 旧请求参数自动映射到 pipeline
- legacy_yolo 模式确保旧逻辑正常运行

### 新功能
- 支持通用 pipeline 编排
- 支持多算子组合
- 支持任务停止控制
- 支持白名单灰度发布

### 可靠性
- 原子任务抢占(防止重复执行)
- 完整的错误处理和状态管理
- 详细的审计追踪(算子实例表)

## 部署说明

1. 执行 DDL:mysql < scripts/db/data-annotation-operator-pipeline-migration.sql
2. 配置环境变量:AUTO_ANNOTATION_OPERATOR_WHITELIST=ImageObjectDetectionBoundingBox
3. 重启服务:datamate-runtime 和 datamate-backend-python

## 验证步骤

1. 兼容模式验证:使用旧 /auto 接口创建任务
2. 通用编排验证:使用新 /operator-tasks 接口创建 pipeline 任务
3. 原子 claim 验证:检查 run_token 机制
4. 停止验证:测试 stop API
5. 白名单验证:测试算子白名单拦截

## 相关文件

- DDL: scripts/db/data-annotation-operator-pipeline-migration.sql
- API: runtime/datamate-python/app/module/annotation/
- Worker: runtime/python-executor/datamate/auto_annotation_worker.py
- 算子: runtime/ops/annotation/image_object_detection_bounding_box/
This commit is contained in:
2026-02-07 22:35:33 +08:00
parent 9efc07935f
commit 2f49fc4199
9 changed files with 1606 additions and 480 deletions

View File

@@ -17,22 +17,41 @@ frontend can display real-time status.
- 为了保持简单,目前不处理 "running" 状态的恢复逻辑;容器重启时,
已处于 running 的任务不会被重新拉起,需要后续扩展。
"""
from __future__ import annotations
import json
import os
import sys
import threading
import time
from __future__ import annotations
import importlib
import json
import os
import sys
import threading
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from loguru import logger
from sqlalchemy import text
from datamate.sql_manager.sql_manager import SQLManager
from loguru import logger
from sqlalchemy import text
try:
import datamate.ops # noqa: F401
except Exception as import_ops_err: # pragma: no cover - 兜底日志
logger.warning("Failed to import datamate.ops package for operator registry: {}", import_ops_err)
try:
import ops.annotation # type: ignore # noqa: F401
except Exception as import_annotation_ops_err: # pragma: no cover - 兜底日志
logger.warning(
"Failed to import ops.annotation package for operator registry: {}",
import_annotation_ops_err,
)
try:
from datamate.core.base_op import OPERATORS
except Exception: # pragma: no cover - 兜底
OPERATORS = None # type: ignore
from datamate.sql_manager.sql_manager import SQLManager
# 尝试多种导入路径,适配不同的打包/安装方式
ImageObjectDetectionBoundingBox = None # type: ignore
@@ -97,62 +116,114 @@ if ImageObjectDetectionBoundingBox is None:
POLL_INTERVAL_SECONDS = float(os.getenv("AUTO_ANNOTATION_POLL_INTERVAL", "5"))
DEFAULT_OUTPUT_ROOT = os.getenv(
"AUTO_ANNOTATION_OUTPUT_ROOT", "/dataset"
)
DEFAULT_OUTPUT_ROOT = os.getenv(
"AUTO_ANNOTATION_OUTPUT_ROOT", "/dataset"
)
DEFAULT_OPERATOR_WHITELIST = os.getenv(
"AUTO_ANNOTATION_OPERATOR_WHITELIST",
"ImageObjectDetectionBoundingBox",
)
def _fetch_pending_task() -> Optional[Dict[str, Any]]:
"""从 t_dm_auto_annotation_tasks 中取出一个 pending 任务。"""
sql = text(
"""
SELECT id, name, dataset_id, dataset_name, config, file_ids, status,
total_images, processed_images, detected_objects, output_path
FROM t_dm_auto_annotation_tasks
WHERE status = 'pending' AND deleted_at IS NULL
ORDER BY created_at ASC
LIMIT 1
"""
)
with SQLManager.create_connect() as conn:
result = conn.execute(sql).fetchone()
if not result:
return None
row = dict(result._mapping) # type: ignore[attr-defined]
try:
row["config"] = json.loads(row["config"]) if row.get("config") else {}
except Exception:
row["config"] = {}
try:
raw_ids = row.get("file_ids")
if not raw_ids:
row["file_ids"] = None
elif isinstance(raw_ids, str):
row["file_ids"] = json.loads(raw_ids)
else:
row["file_ids"] = raw_ids
except Exception:
row["file_ids"] = None
return row
def _fetch_pending_task() -> Optional[Dict[str, Any]]:
"""原子 claim 一个 pending 任务并返回任务详情"""
def _parse_json_field(value: Any, default: Any) -> Any:
if value is None:
return default
if isinstance(value, (dict, list)):
return value
if isinstance(value, str):
text_value = value.strip()
if not text_value:
return default
try:
return json.loads(text_value)
except Exception:
return default
return default
run_token = str(uuid.uuid4())
now = datetime.now()
claim_sql = text(
"""
UPDATE t_dm_auto_annotation_tasks
SET status = 'running',
run_token = :run_token,
started_at = COALESCE(started_at, :now),
heartbeat_at = :now,
updated_at = :now,
error_message = NULL
WHERE id = (
SELECT id FROM (
SELECT id
FROM t_dm_auto_annotation_tasks
WHERE status = 'pending'
AND deleted_at IS NULL
AND COALESCE(stop_requested, 0) = 0
ORDER BY created_at ASC
LIMIT 1
) AS pending_task
)
AND status = 'pending'
AND deleted_at IS NULL
AND COALESCE(stop_requested, 0) = 0
"""
)
query_sql = text(
"""
SELECT id, name, dataset_id, dataset_name, created_by,
config, file_ids, pipeline,
task_mode, executor_type,
status, stop_requested, run_token,
total_images, processed_images, detected_objects,
output_path, output_dataset_id
FROM t_dm_auto_annotation_tasks
WHERE run_token = :run_token
LIMIT 1
"""
)
with SQLManager.create_connect() as conn:
claim_result = conn.execute(claim_sql, {"run_token": run_token, "now": now})
if not claim_result or int(getattr(claim_result, "rowcount", 0) or 0) <= 0:
return None
result = conn.execute(query_sql, {"run_token": run_token}).fetchone()
if not result:
return None
row = dict(result._mapping) # type: ignore[attr-defined]
row["config"] = _parse_json_field(row.get("config"), {})
parsed_file_ids = _parse_json_field(row.get("file_ids"), None)
row["file_ids"] = parsed_file_ids if parsed_file_ids else None
parsed_pipeline = _parse_json_field(row.get("pipeline"), None)
row["pipeline"] = parsed_pipeline if parsed_pipeline else None
return row
def _update_task_status(
task_id: str,
*,
status: str,
progress: Optional[int] = None,
processed_images: Optional[int] = None,
detected_objects: Optional[int] = None,
total_images: Optional[int] = None,
output_path: Optional[str] = None,
error_message: Optional[str] = None,
completed: bool = False,
) -> None:
"""更新任务的状态和统计字段。"""
def _update_task_status(
task_id: str,
*,
status: str,
run_token: Optional[str] = None,
progress: Optional[int] = None,
processed_images: Optional[int] = None,
detected_objects: Optional[int] = None,
total_images: Optional[int] = None,
output_path: Optional[str] = None,
output_dataset_id: Optional[str] = None,
error_message: Optional[str] = None,
completed: bool = False,
clear_run_token: bool = False,
) -> None:
"""更新任务的状态和统计字段。"""
fields: List[str] = ["status = :status", "updated_at = :updated_at"]
params: Dict[str, Any] = {
@@ -173,26 +244,321 @@ def _update_task_status(
if total_images is not None:
fields.append("total_images = :total_images")
params["total_images"] = int(total_images)
if output_path is not None:
fields.append("output_path = :output_path")
params["output_path"] = output_path
if error_message is not None:
fields.append("error_message = :error_message")
params["error_message"] = error_message[:2000]
if completed:
fields.append("completed_at = :completed_at")
params["completed_at"] = datetime.now()
sql = text(
f"""
UPDATE t_dm_auto_annotation_tasks
SET {', '.join(fields)}
WHERE id = :task_id
"""
)
with SQLManager.create_connect() as conn:
conn.execute(sql, params)
if output_path is not None:
fields.append("output_path = :output_path")
params["output_path"] = output_path
if output_dataset_id is not None:
fields.append("output_dataset_id = :output_dataset_id")
params["output_dataset_id"] = output_dataset_id
if error_message is not None:
fields.append("error_message = :error_message")
params["error_message"] = error_message[:2000]
if status == "running":
fields.append("heartbeat_at = :heartbeat_at")
params["heartbeat_at"] = datetime.now()
if completed:
fields.append("completed_at = :completed_at")
params["completed_at"] = datetime.now()
if clear_run_token:
fields.append("run_token = NULL")
where_clause = "id = :task_id"
if run_token:
where_clause += " AND run_token = :run_token"
params["run_token"] = run_token
sql = text(
f"""
UPDATE t_dm_auto_annotation_tasks
SET {', '.join(fields)}
WHERE {where_clause}
"""
)
with SQLManager.create_connect() as conn:
result = conn.execute(sql, params)
if int(getattr(result, "rowcount", 0) or 0) <= 0:
logger.warning(
"No rows updated for task status change: task_id={}, status={}, run_token={}",
task_id,
status,
run_token,
)
def _is_stop_requested(task_id: str, run_token: Optional[str] = None) -> bool:
"""检查任务是否请求停止。"""
where_clause = "id = :task_id"
params: Dict[str, Any] = {"task_id": task_id}
if run_token:
where_clause += " AND run_token = :run_token"
params["run_token"] = run_token
sql = text(
f"""
SELECT COALESCE(stop_requested, 0)
FROM t_dm_auto_annotation_tasks
WHERE {where_clause}
LIMIT 1
"""
)
with SQLManager.create_connect() as conn:
row = conn.execute(sql, params).fetchone()
if not row:
# 找不到任务(或 run_token 已失效)时保守停止
return True
return bool(row[0])
def _extract_step_overrides(step: Dict[str, Any]) -> Dict[str, Any]:
"""合并 pipeline 节点中的参数覆盖。"""
overrides: Dict[str, Any] = {}
for key in ("overrides", "settingsOverride", "settings_override"):
value = step.get(key)
if value is None:
continue
if isinstance(value, str):
try:
value = json.loads(value)
except Exception:
continue
if isinstance(value, dict):
overrides.update(value)
return overrides
def _build_legacy_pipeline(config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""将 legacy_yolo 配置映射为单步 pipeline。"""
return [
{
"operatorId": "ImageObjectDetectionBoundingBox",
"overrides": {
"modelSize": config.get("modelSize", "l"),
"confThreshold": float(config.get("confThreshold", 0.7)),
"targetClasses": config.get("targetClasses", []) or [],
},
}
]
def _get_output_dataset_name(
task_id: str,
dataset_id: str,
source_dataset_name: str,
task_name: str,
config: Dict[str, Any],
pipeline_raw: Optional[List[Any]],
) -> str:
"""确定输出数据集名称。"""
output_name = config.get("outputDatasetName")
if output_name:
return str(output_name)
if pipeline_raw:
for step in pipeline_raw:
if not isinstance(step, dict):
continue
overrides = _extract_step_overrides(step)
output_name = overrides.get("outputDatasetName") or overrides.get("output_dataset_name")
if output_name:
return str(output_name)
base_name = source_dataset_name or task_name or f"dataset-{dataset_id[:8]}"
return f"{base_name}_auto_{task_id[:8]}"
def _normalize_pipeline(
task_mode: str,
config: Dict[str, Any],
pipeline_raw: Optional[List[Any]],
output_dir: str,
) -> List[Dict[str, Any]]:
"""标准化 pipeline 结构并注入 outputDir。"""
source_pipeline = pipeline_raw
if task_mode == "legacy_yolo" or not source_pipeline:
source_pipeline = _build_legacy_pipeline(config)
normalized: List[Dict[str, Any]] = []
for step in source_pipeline:
if not isinstance(step, dict):
continue
operator_id: Optional[str] = None
overrides: Dict[str, Any] = {}
# 兼容 [{"OpName": {...}}] 风格
if (
"operatorId" not in step
and "operator_id" not in step
and "id" not in step
and len(step) == 1
):
first_key = next(iter(step.keys()))
first_value = step.get(first_key)
if isinstance(first_key, str):
operator_id = first_key
if isinstance(first_value, dict):
overrides.update(first_value)
operator_id = operator_id or step.get("operatorId") or step.get("operator_id") or step.get("id")
if not operator_id:
continue
overrides.update(_extract_step_overrides(step))
overrides.setdefault("outputDir", output_dir)
normalized.append(
{
"operator_id": str(operator_id),
"overrides": overrides,
}
)
return normalized
def _resolve_operator_class(operator_id: str):
"""根据 operator_id 解析算子类。"""
if operator_id == "ImageObjectDetectionBoundingBox":
if ImageObjectDetectionBoundingBox is None:
raise ImportError("ImageObjectDetectionBoundingBox is not available")
return ImageObjectDetectionBoundingBox
registry_item = OPERATORS.get(operator_id) if OPERATORS is not None else None
if registry_item is None:
try:
from core.base_op import OPERATORS as relative_operators # type: ignore
registry_item = relative_operators.get(operator_id)
except Exception:
registry_item = None
if registry_item is None:
raise ImportError(f"Operator not found in registry: {operator_id}")
if isinstance(registry_item, str):
submodule = importlib.import_module(registry_item)
operator_cls = getattr(submodule, operator_id, None)
if operator_cls is None:
raise ImportError(
f"Operator class {operator_id} not found in module {registry_item}"
)
return operator_cls
return registry_item
def _build_operator_chain(pipeline: List[Dict[str, Any]]) -> List[Tuple[str, Any]]:
"""初始化算子链。"""
chain: List[Tuple[str, Any]] = []
for step in pipeline:
operator_id = step.get("operator_id")
overrides = dict(step.get("overrides") or {})
if not operator_id:
continue
operator_cls = _resolve_operator_class(str(operator_id))
operator = operator_cls(**overrides)
chain.append((str(operator_id), operator))
return chain
def _run_pipeline_sample(sample: Dict[str, Any], chain: List[Tuple[str, Any]]) -> Dict[str, Any]:
"""在单个样本上执行 pipeline。"""
current_sample: Dict[str, Any] = dict(sample)
for operator_id, operator in chain:
if hasattr(operator, "execute") and callable(getattr(operator, "execute")):
result = operator.execute(current_sample)
elif callable(operator):
result = operator(current_sample)
else:
raise RuntimeError(f"Operator {operator_id} is not executable")
if result is None:
continue
if isinstance(result, dict):
current_sample.update(result)
continue
if isinstance(result, list):
# 仅取第一个 dict 结果,兼容部分返回 list 的算子
if result and isinstance(result[0], dict):
current_sample.update(result[0])
continue
logger.debug(
"Operator {} returned unsupported result type: {}",
operator_id,
type(result).__name__,
)
return current_sample
def _count_detections(sample: Dict[str, Any]) -> int:
"""从样本中提取检测数量。"""
annotations = sample.get("annotations")
if isinstance(annotations, dict):
detections = annotations.get("detections")
if isinstance(detections, list):
return len(detections)
detection_count = sample.get("detection_count")
if detection_count is None:
return 0
try:
return max(int(detection_count), 0)
except Exception:
return 0
def _get_operator_whitelist() -> Optional[set[str]]:
"""获取灰度白名单;返回 None 表示放开全部。"""
raw = str(DEFAULT_OPERATOR_WHITELIST or "").strip()
if not raw:
return None
normalized = raw.lower()
if normalized in {"*", "all", "any"}:
return None
allow_set = {
item.strip()
for item in raw.split(",")
if item and item.strip()
}
return allow_set or None
def _validate_pipeline_whitelist(pipeline: List[Dict[str, Any]]) -> None:
"""校验 pipeline 是否命中灰度白名单。"""
allow_set = _get_operator_whitelist()
if allow_set is None:
return
blocked = []
for step in pipeline:
operator_id = str(step.get("operator_id") or "")
if not operator_id:
continue
if operator_id not in allow_set:
blocked.append(operator_id)
if blocked:
raise RuntimeError(
"Operator not in whitelist: " + ", ".join(sorted(set(blocked)))
)
def _load_dataset_files(dataset_id: str) -> List[Tuple[str, str, str]]:
@@ -452,168 +818,231 @@ def _register_output_dataset(
)
def _process_single_task(task: Dict[str, Any]) -> None:
"""执行单个自动标注任务。"""
if ImageObjectDetectionBoundingBox is None:
logger.error(
"YOLO operator not available (import failed earlier), skip auto-annotation task: {}",
task["id"],
)
_update_task_status(
task["id"],
status="failed",
error_message="YOLO operator not available in runtime container",
)
return
task_id = str(task["id"])
dataset_id = str(task["dataset_id"])
task_name = str(task.get("name") or "")
source_dataset_name = str(task.get("dataset_name") or "")
cfg: Dict[str, Any] = task.get("config") or {}
selected_file_ids: Optional[List[str]] = task.get("file_ids") or None
model_size = cfg.get("modelSize", "l")
conf_threshold = float(cfg.get("confThreshold", 0.7))
target_classes = cfg.get("targetClasses", []) or []
output_dataset_name = cfg.get("outputDatasetName")
if not output_dataset_name:
base_name = source_dataset_name or task_name or f"dataset-{dataset_id[:8]}"
output_dataset_name = f"{base_name}_auto_{task_id[:8]}"
logger.info(
"Start processing auto-annotation task: id={}, dataset_id={}, model_size={}, conf_threshold={}, target_classes={}, output_dataset_name={}",
task_id,
dataset_id,
model_size,
conf_threshold,
target_classes,
output_dataset_name,
)
_update_task_status(task_id, status="running", progress=0)
if selected_file_ids:
all_files = _load_files_by_ids(selected_file_ids)
else:
all_files = _load_dataset_files(dataset_id)
files = [(path, name) for _, path, name in all_files]
total_images = len(files)
if total_images == 0:
logger.warning("No files found for dataset {} when running auto-annotation task {}", dataset_id, task_id)
_update_task_status(
task_id,
status="completed",
progress=100,
total_images=0,
processed_images=0,
detected_objects=0,
completed=True,
output_path=None,
)
return
output_dataset_id, output_dir = _create_output_dataset(
source_dataset_id=dataset_id,
source_dataset_name=source_dataset_name,
output_dataset_name=output_dataset_name,
)
output_dir = _ensure_output_dir(output_dir)
try:
detector = ImageObjectDetectionBoundingBox(
modelSize=model_size,
confThreshold=conf_threshold,
targetClasses=target_classes,
outputDir=output_dir,
)
except Exception as e:
logger.error("Failed to init YOLO detector for task {}: {}", task_id, e)
_update_task_status(
task_id,
status="failed",
total_images=total_images,
processed_images=0,
detected_objects=0,
error_message=f"Init YOLO detector failed: {e}",
)
return
processed = 0
detected_total = 0
for file_path, file_name in files:
try:
sample = {
"image": file_path,
"filename": file_name,
}
result = detector.execute(sample)
annotations = (result or {}).get("annotations", {})
detections = annotations.get("detections", [])
detected_total += len(detections)
processed += 1
progress = int(processed * 100 / total_images) if total_images > 0 else 100
_update_task_status(
task_id,
status="running",
progress=progress,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
)
except Exception as e:
logger.error(
"Failed to process image for task {}: file_path={}, error={}",
task_id,
file_path,
e,
)
continue
_update_task_status(
task_id,
status="completed",
progress=100,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
completed=True,
)
logger.info(
"Completed auto-annotation task: id={}, total_images={}, processed={}, detected_objects={}, output_path={}",
task_id,
total_images,
processed,
detected_total,
output_dir,
)
if output_dataset_name and output_dataset_id:
try:
_register_output_dataset(
task_id=task_id,
output_dataset_id=output_dataset_id,
output_dir=output_dir,
output_dataset_name=output_dataset_name,
total_images=total_images,
)
except Exception as e: # pragma: no cover - 防御性日志
logger.error(
"Failed to register auto-annotation output as dataset for task {}: {}",
task_id,
e,
)
def _process_single_task(task: Dict[str, Any]) -> None:
"""执行单个自动标注任务。"""
task_id = str(task["id"])
dataset_id = str(task["dataset_id"])
task_name = str(task.get("name") or "")
source_dataset_name = str(task.get("dataset_name") or "")
run_token = str(task.get("run_token") or "")
task_mode = str(task.get("task_mode") or "legacy_yolo")
executor_type = str(task.get("executor_type") or "annotation_local")
cfg: Dict[str, Any] = task.get("config") or {}
pipeline_raw = task.get("pipeline")
selected_file_ids: Optional[List[str]] = task.get("file_ids") or None
output_dataset_name = _get_output_dataset_name(
task_id=task_id,
dataset_id=dataset_id,
source_dataset_name=source_dataset_name,
task_name=task_name,
config=cfg,
pipeline_raw=pipeline_raw if isinstance(pipeline_raw, list) else None,
)
logger.info(
"Start processing auto-annotation task: id={}, dataset_id={}, task_mode={}, executor_type={}, output_dataset_name={}",
task_id,
dataset_id,
task_mode,
executor_type,
output_dataset_name,
)
if _is_stop_requested(task_id, run_token):
logger.info("Task stop requested before processing started: {}", task_id)
_update_task_status(
task_id,
run_token=run_token,
status="stopped",
completed=True,
clear_run_token=True,
error_message="Task stopped before start",
)
return
_update_task_status(task_id, run_token=run_token, status="running", progress=0)
if selected_file_ids:
all_files = _load_files_by_ids(selected_file_ids)
else:
all_files = _load_dataset_files(dataset_id)
files = [(path, name) for _, path, name in all_files]
total_images = len(files)
if total_images == 0:
logger.warning("No files found for dataset {} when running auto-annotation task {}", dataset_id, task_id)
_update_task_status(
task_id,
run_token=run_token,
status="completed",
progress=100,
total_images=0,
processed_images=0,
detected_objects=0,
completed=True,
output_path=None,
clear_run_token=True,
)
return
output_dataset_id, output_dir = _create_output_dataset(
source_dataset_id=dataset_id,
source_dataset_name=source_dataset_name,
output_dataset_name=output_dataset_name,
)
output_dir = _ensure_output_dir(output_dir)
_update_task_status(
task_id,
run_token=run_token,
status="running",
total_images=total_images,
output_path=output_dir,
output_dataset_id=output_dataset_id,
)
try:
normalized_pipeline = _normalize_pipeline(
task_mode=task_mode,
config=cfg,
pipeline_raw=pipeline_raw if isinstance(pipeline_raw, list) else None,
output_dir=output_dir,
)
if not normalized_pipeline:
raise RuntimeError("Pipeline is empty after normalization")
_validate_pipeline_whitelist(normalized_pipeline)
chain = _build_operator_chain(normalized_pipeline)
if not chain:
raise RuntimeError("No valid operator instances initialized")
except Exception as e:
logger.error("Failed to init operator pipeline for task {}: {}", task_id, e)
_update_task_status(
task_id,
run_token=run_token,
status="failed",
total_images=total_images,
processed_images=0,
detected_objects=0,
error_message=f"Init pipeline failed: {e}",
clear_run_token=True,
)
return
processed = 0
detected_total = 0
try:
for file_path, file_name in files:
if _is_stop_requested(task_id, run_token):
logger.info("Task stop requested during processing: {}", task_id)
_update_task_status(
task_id,
run_token=run_token,
status="stopped",
progress=int(processed * 100 / total_images) if total_images > 0 else 0,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
output_dataset_id=output_dataset_id,
completed=True,
clear_run_token=True,
error_message="Task stopped by request",
)
return
try:
sample = {
"image": file_path,
"filename": file_name,
}
result = _run_pipeline_sample(sample, chain)
detected_total += _count_detections(result)
processed += 1
progress = int(processed * 100 / total_images) if total_images > 0 else 100
_update_task_status(
task_id,
run_token=run_token,
status="running",
progress=progress,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
output_dataset_id=output_dataset_id,
)
except Exception as e:
logger.error(
"Failed to process image for task {}: file_path={}, error={}",
task_id,
file_path,
e,
)
continue
_update_task_status(
task_id,
run_token=run_token,
status="completed",
progress=100,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
output_dataset_id=output_dataset_id,
completed=True,
clear_run_token=True,
)
logger.info(
"Completed auto-annotation task: id={}, total_images={}, processed={}, detected_objects={}, output_path={}",
task_id,
total_images,
processed,
detected_total,
output_dir,
)
if output_dataset_name and output_dataset_id:
try:
_register_output_dataset(
task_id=task_id,
output_dataset_id=output_dataset_id,
output_dir=output_dir,
output_dataset_name=output_dataset_name,
total_images=total_images,
)
except Exception as e: # pragma: no cover - 防御性日志
logger.error(
"Failed to register auto-annotation output as dataset for task {}: {}",
task_id,
e,
)
except Exception as e:
logger.error("Task execution failed for task {}: {}", task_id, e)
_update_task_status(
task_id,
run_token=run_token,
status="failed",
progress=int(processed * 100 / total_images) if total_images > 0 else 0,
processed_images=processed,
detected_objects=detected_total,
total_images=total_images,
output_path=output_dir,
output_dataset_id=output_dataset_id,
error_message=f"Execute pipeline failed: {e}",
clear_run_token=True,
)
def _worker_loop() -> None: