feat(auto-annotation): integrate YOLO auto-labeling and enhance data management (#223)

* feat(auto-annotation): initial setup

* chore: remove package-lock.json

* chore: 清理本地测试脚本与 Maven 设置

* chore: change package-lock.json
This commit is contained in:
Kecheng Sha
2026-01-05 14:22:44 +08:00
committed by GitHub
parent ccfb84c034
commit 3f1ad6a872
44 changed files with 8503 additions and 5238 deletions

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""Annotation-related operators (e.g. YOLO detection)."""
__all__ = [
"image_object_detection_bounding_box",
]

View File

@@ -0,0 +1,9 @@
"""Image object detection (YOLOv8) operator package.
This package exposes the ImageObjectDetectionBoundingBox annotator so that
the auto-annotation worker can import it via different module paths.
"""
from .process import ImageObjectDetectionBoundingBox
__all__ = ["ImageObjectDetectionBoundingBox"]

View File

@@ -0,0 +1,3 @@
name: image_object_detection_bounding_box
version: 0.1.0
description: "YOLOv8-based object detection operator for auto annotation"

View File

@@ -0,0 +1,214 @@
#!/user/bin/python
# -- encoding: utf-8 --
"""
Description: 图像目标检测算子
Create: 2025/12/17
"""
import os
import json
import time
from typing import Dict, Any
import cv2
import numpy as np
from loguru import logger
try:
from ultralytics import YOLO
except ImportError:
logger.warning("ultralytics not installed. Please install it using: pip install ultralytics")
YOLO = None
from datamate.core.base_op import Mapper
# COCO 80 类别映射
COCO_CLASS_MAP = {
0: "person", 1: "bicycle", 2: "car", 3: "motorcycle", 4: "airplane",
5: "bus", 6: "train", 7: "truck", 8: "boat", 9: "traffic light",
10: "fire hydrant", 11: "stop sign", 12: "parking meter", 13: "bench",
14: "bird", 15: "cat", 16: "dog", 17: "horse", 18: "sheep", 19: "cow",
20: "elephant", 21: "bear", 22: "zebra", 23: "giraffe", 24: "backpack",
25: "umbrella", 26: "handbag", 27: "tie", 28: "suitcase", 29: "frisbee",
30: "skis", 31: "snowboard", 32: "sports ball", 33: "kite",
34: "baseball bat", 35: "baseball glove", 36: "skateboard",
37: "surfboard", 38: "tennis racket", 39: "bottle",
40: "wine glass", 41: "cup", 42: "fork", 43: "knife", 44: "spoon",
45: "bowl", 46: "banana", 47: "apple", 48: "sandwich", 49: "orange",
50: "broccoli", 51: "carrot", 52: "hot dog", 53: "pizza",
54: "donut", 55: "cake", 56: "chair", 57: "couch",
58: "potted plant", 59: "bed", 60: "dining table", 61: "toilet",
62: "tv", 63: "laptop", 64: "mouse", 65: "remote",
66: "keyboard", 67: "cell phone", 68: "microwave", 69: "oven",
70: "toaster", 71: "sink", 72: "refrigerator", 73: "book",
74: "clock", 75: "vase", 76: "scissors", 77: "teddy bear",
78: "hair drier", 79: "toothbrush"
}
class ImageObjectDetectionBoundingBox(Mapper):
"""图像目标检测算子"""
# 模型映射
MODEL_MAP = {
"n": "yolov8n.pt",
"s": "yolov8s.pt",
"m": "yolov8m.pt",
"l": "yolov8l.pt",
"x": "yolov8x.pt",
}
def __init__(self, *args, **kwargs):
super(ImageObjectDetectionBoundingBox, self).__init__(*args, **kwargs)
# 获取参数
self._model_size = kwargs.get("modelSize", "l")
self._conf_threshold = kwargs.get("confThreshold", 0.7)
self._target_classes = kwargs.get("targetClasses", [])
self._output_dir = kwargs.get("outputDir", None) # 输出目录
# 如果目标类别为空列表,则检测所有类别
if not self._target_classes:
self._target_classes = None
else:
# 确保是整数列表
self._target_classes = [int(cls_id) for cls_id in self._target_classes]
# 获取模型路径
model_filename = self.MODEL_MAP.get(self._model_size, "yolov8l.pt")
current_dir = os.path.dirname(os.path.abspath(__file__))
model_path = os.path.join(current_dir, model_filename)
# 初始化模型
if YOLO is None:
raise ImportError("ultralytics is not installed. Please install it.")
if not os.path.exists(model_path):
logger.warning(f"Model file {model_path} not found. Downloading from ultralytics...")
self.model = YOLO(model_filename) # 自动下载
else:
self.model = YOLO(model_path)
logger.info(f"Loaded YOLOv8 model: {model_filename}, "
f"conf_threshold: {self._conf_threshold}, "
f"target_classes: {self._target_classes}")
@staticmethod
def _get_color_by_class_id(class_id: int):
"""根据 class_id 生成稳定颜色(BGR,OpenCV 用)"""
np.random.seed(class_id)
color = np.random.randint(0, 255, size=3).tolist()
return tuple(color)
def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]:
"""执行目标检测"""
start = time.time()
# 读取图像文件
image_path = sample.get(self.image_key)
if not image_path or not os.path.exists(image_path):
logger.warning(f"Image file not found: {image_path}")
return sample
# 读取图像
img = cv2.imread(image_path)
if img is None:
logger.warning(f"Failed to read image: {image_path}")
return sample
# 执行目标检测
results = self.model(img, conf=self._conf_threshold)
r = results[0]
# 准备标注数据
h, w = img.shape[:2]
annotations = {
"image": os.path.basename(image_path),
"width": w,
"height": h,
"model_size": self._model_size,
"conf_threshold": self._conf_threshold,
"selected_class_ids": self._target_classes,
"detections": []
}
# 处理检测结果
if r.boxes is not None:
for box in r.boxes:
cls_id = int(box.cls[0])
# 过滤目标类别
if self._target_classes is not None and cls_id not in self._target_classes:
continue
conf = float(box.conf[0])
x1, y1, x2, y2 = map(float, box.xyxy[0])
label = COCO_CLASS_MAP.get(cls_id, f"class_{cls_id}")
# 记录检测结果
annotations["detections"].append({
"label": label,
"class_id": cls_id,
"confidence": round(conf, 4),
"bbox_xyxy": [x1, y1, x2, y2],
"bbox_xywh": [x1, y1, x2 - x1, y2 - y1]
})
# 在图像上绘制
color = self._get_color_by_class_id(cls_id)
cv2.rectangle(
img,
(int(x1), int(y1)),
(int(x2), int(y2)),
color,
2
)
cv2.putText(
img,
f"{label} {conf:.2f}",
(int(x1), max(int(y1) - 5, 10)),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
color,
1
)
# 确定输出目录
if self._output_dir and os.path.exists(self._output_dir):
output_dir = self._output_dir
else:
output_dir = os.path.dirname(image_path)
# 创建输出子目录(可选,用于组织文件)
images_dir = os.path.join(output_dir, "images")
annotations_dir = os.path.join(output_dir, "annotations")
os.makedirs(images_dir, exist_ok=True)
os.makedirs(annotations_dir, exist_ok=True)
# 保持原始文件名(不添加后缀),确保一一对应
base_name = os.path.basename(image_path)
name_without_ext = os.path.splitext(base_name)[0]
# 保存标注图像(保持原始扩展名或使用jpg)
output_filename = base_name
output_path = os.path.join(images_dir, output_filename)
cv2.imwrite(output_path, img)
# 保存标注 JSON(文件名与图像对应)
json_filename = f"{name_without_ext}.json"
json_path = os.path.join(annotations_dir, json_filename)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(annotations, f, indent=2, ensure_ascii=False)
# 更新样本数据
sample["detection_count"] = len(annotations["detections"])
sample["output_image"] = output_path
sample["annotations_file"] = json_path
sample["annotations"] = annotations
logger.info(f"Image: {os.path.basename(image_path)}, "
f"Detections: {len(annotations['detections'])}, "
f"Time: {(time.time() - start):.4f}s")
return sample

View File

@@ -0,0 +1,166 @@
import os
import json
from pathlib import Path
from ultralytics import YOLO
import cv2
import numpy as np
def get_color_by_class_id(class_id: int):
"""根据 class_id 生成稳定颜色(BGR)"""
np.random.seed(class_id)
color = np.random.randint(0, 255, size=3).tolist()
return tuple(color)
def mask_to_polygons(mask: np.ndarray):
"""将二值 mask 转换为 COCO 风格多边形列表"""
contours, _ = cv2.findContours(
mask,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE
)
polygons = []
for contour in contours:
if contour.shape[0] < 3:
continue
polygon = contour.flatten().tolist()
polygons.append(polygon)
return polygons
IMAGE_DIR = "C:/Users/meta/Desktop/Datamate/yolo/Photos"
OUT_IMG_DIR = "outputs_seg/images"
OUT_JSON_DIR = "outputs_seg/annotations"
MODEL_MAP = {
"n": "yolov8n-seg.pt",
"s": "yolov8s-seg.pt",
"m": "yolov8m-seg.pt",
"l": "yolov8l-seg.pt",
"x": "yolov8x-seg.pt",
}
MODEL_KEY = "x"
MODEL_PATH = MODEL_MAP[MODEL_KEY]
CONF_THRES = 0.7
DRAW_BBOX = True
COCO_CLASS_MAP = {
0: "person", 1: "bicycle", 2: "car", 3: "motorcycle", 4: "airplane",
5: "bus", 6: "train", 7: "truck", 8: "boat", 9: "traffic light",
10: "fire hydrant", 11: "stop sign", 12: "parking meter", 13: "bench",
14: "bird", 15: "cat", 16: "dog", 17: "horse", 18: "sheep", 19: "cow",
20: "elephant", 21: "bear", 22: "zebra", 23: "giraffe", 24: "backpack",
25: "umbrella", 26: "handbag", 27: "tie", 28: "suitcase", 29: "frisbee",
30: "skis", 31: "snowboard", 32: "sports ball", 33: "kite",
34: "baseball bat", 35: "baseball glove", 36: "skateboard",
37: "surfboard", 38: "tennis racket", 39: "bottle",
40: "wine glass", 41: "cup", 42: "fork", 43: "knife", 44: "spoon",
45: "bowl", 46: "banana", 47: "apple", 48: "sandwich", 49: "orange",
50: "broccoli", 51: "carrot", 52: "hot dog", 53: "pizza",
54: "donut", 55: "cake", 56: "chair", 57: "couch",
58: "potted plant", 59: "bed", 60: "dining table", 61: "toilet",
62: "tv", 63: "laptop", 64: "mouse", 65: "remote",
66: "keyboard", 67: "cell phone", 68: "microwave", 69: "oven",
70: "toaster", 71: "sink", 72: "refrigerator", 73: "book",
74: "clock", 75: "vase", 76: "scissors", 77: "teddy bear",
78: "hair drier", 79: "toothbrush"
}
TARGET_CLASS_IDS = [0, 2, 5]
os.makedirs(OUT_IMG_DIR, exist_ok=True)
os.makedirs(OUT_JSON_DIR, exist_ok=True)
if TARGET_CLASS_IDS is not None:
for cid in TARGET_CLASS_IDS:
if cid not in COCO_CLASS_MAP:
raise ValueError(f"Invalid class id: {cid}")
model = YOLO(MODEL_PATH)
image_paths = list(Path(IMAGE_DIR).glob("*.*"))
for img_path in image_paths:
img = cv2.imread(str(img_path))
if img is None:
print(f"[WARN] Failed to read {img_path}")
continue
results = model(img, conf=CONF_THRES)
r = results[0]
h, w = img.shape[:2]
annotations = {
"image": img_path.name,
"width": w,
"height": h,
"model_key": MODEL_KEY,
"conf_threshold": CONF_THRES,
"supported_classes": COCO_CLASS_MAP,
"selected_class_ids": TARGET_CLASS_IDS,
"instances": []
}
if r.boxes is not None and r.masks is not None:
for i, box in enumerate(r.boxes):
cls_id = int(box.cls[0])
if TARGET_CLASS_IDS is not None and cls_id not in TARGET_CLASS_IDS:
continue
conf = float(box.conf[0])
x1, y1, x2, y2 = map(float, box.xyxy[0])
label = COCO_CLASS_MAP[cls_id]
mask = r.masks.data[i].cpu().numpy()
mask = (mask > 0.5).astype(np.uint8)
mask = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST)
color = get_color_by_class_id(cls_id)
img[mask == 1] = (
img[mask == 1] * 0.5 + np.array(color) * 0.5
).astype(np.uint8)
if True:
cv2.rectangle(
img,
(int(x1), int(y1)),
(int(x2), int(y2)),
color,
2
)
cv2.putText(
img,
f"{label} {conf:.2f}",
(int(x1), max(int(y1) - 5, 10)),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
color,
1
)
polygons = mask_to_polygons(mask)
annotations["instances"].append({
"label": label,
"class_id": cls_id,
"confidence": round(conf, 4),
"bbox_xyxy": [x1, y1, x2, y2],
"segmentation": polygons
})
out_img_path = os.path.join(OUT_IMG_DIR, img_path.name)
out_json_path = os.path.join(OUT_JSON_DIR, img_path.stem + ".json")
cv2.imwrite(out_img_path, img)
with open(out_json_path, "w", encoding="utf-8") as f:
json.dump(annotations, f, indent=2, ensure_ascii=False)
print(f"[OK] {img_path.name}")
print("Segmentation batch finished.")