feat(annotation): 增强标注编辑器的文本数据处理功能

- 添加 JSON 和 XML 解析支持用于处理标注配置
- 实现文本占位符填充机制优化用户体验
- 集成标签工作室配置管理功能
- 添加文本对象类型检测和分类处理
- 实现标注配置装饰器增强编辑器
This commit is contained in:
2026-01-22 16:22:32 +08:00
parent d996040b7f
commit c638182c72

View File

@@ -14,12 +14,15 @@ from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
import hashlib
import json
import xml.etree.ElementTree as ET
from fastapi import HTTPException
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import get_logger
from app.db.models import AnnotationResult, Dataset, DatasetFiles, LabelingProject
from app.module.annotation.config import LabelStudioTagConfig
from app.module.annotation.schema.editor import (
EditorProjectInfo,
EditorTaskListItem,
@@ -36,6 +39,13 @@ from app.module.annotation.service.text_fetcher import fetch_text_content_via_do
logger = get_logger(__name__)
TEXT_DATA_KEY = "text"
DATASET_ID_KEY = "dataset_id"
FILE_ID_KEY = "file_id"
FILE_NAME_KEY = "file_name"
TEXTUAL_OBJECT_CATEGORIES = {"text", "document"}
OBJECT_NAME_HEADER_PREFIX = "dm_object_header_"
class AnnotationEditorService:
"""Label Studio Editor 集成服务(TEXT POC 版)"""
@@ -92,6 +102,163 @@ class AnnotationEditorService:
template = await self.template_service.get_template(self.db, template_id)
return getattr(template, "label_config", None) if template else None
async def _resolve_project_label_config(self, project: LabelingProject) -> Optional[str]:
label_config = None
if project.configuration and isinstance(project.configuration, dict):
label_config = project.configuration.get("label_config")
if not label_config:
label_config = await self._get_label_config(project.template_id)
if label_config:
label_config = self._decorate_label_config_for_editor(label_config)
return label_config
@staticmethod
def _try_parse_json_payload(text_content: str) -> Optional[Dict[str, Any]]:
if not text_content:
return None
stripped = text_content.strip()
if not stripped:
return None
if stripped[0] not in ("{", "["):
return None
try:
parsed = json.loads(stripped)
except Exception:
return None
return parsed if isinstance(parsed, dict) else None
@staticmethod
def _is_textual_object_tag(object_tag: str) -> bool:
config = LabelStudioTagConfig.get_object_config(object_tag) or {}
category = config.get("category")
return category in TEXTUAL_OBJECT_CATEGORIES
@classmethod
def _extract_textual_value_keys(cls, label_config: str) -> List[str]:
try:
root = ET.fromstring(label_config)
except Exception as exc:
logger.warning("解析 label_config 失败,已跳过占位填充:%s", exc)
return []
object_types = LabelStudioTagConfig.get_object_types()
seen: Dict[str, None] = {}
for element in root.iter():
if element.tag not in object_types:
continue
if not cls._is_textual_object_tag(element.tag):
continue
value = element.attrib.get("value", "")
if not value.startswith("$"):
continue
key = value[1:].strip()
if not key:
continue
seen[key] = None
return list(seen.keys())
@staticmethod
def _needs_placeholder(value: Any) -> bool:
if value is None:
return True
if isinstance(value, str) and not value.strip():
return True
return False
def _apply_text_placeholders(self, data: Dict[str, Any], label_config: Optional[str]) -> None:
if not label_config:
return
for key in self._extract_textual_value_keys(label_config):
if self._needs_placeholder(data.get(key)):
data[key] = key
@staticmethod
def _header_already_present(header: ET.Element, name: str) -> bool:
value = header.attrib.get("value", "")
if value == name:
return True
header_text = (header.text or "").strip()
return header_text == name
def _decorate_label_config_for_editor(self, label_config: str) -> str:
try:
root = ET.fromstring(label_config)
except Exception as exc:
logger.warning("解析 label_config 失败,已跳过 name 展示增强:%s", exc)
return label_config
object_types = LabelStudioTagConfig.get_object_types()
used_names = set()
for element in root.iter():
name = element.attrib.get("name")
if name:
used_names.add(name)
def allocate_header_name(base: str) -> str:
candidate = f"{OBJECT_NAME_HEADER_PREFIX}{base}"
if candidate not in used_names:
used_names.add(candidate)
return candidate
idx = 1
while f"{candidate}_{idx}" in used_names:
idx += 1
resolved = f"{candidate}_{idx}"
used_names.add(resolved)
return resolved
for parent in root.iter():
children = list(parent)
i = 0
while i < len(children):
child = children[i]
if child.tag not in object_types:
i += 1
continue
if not self._is_textual_object_tag(child.tag):
i += 1
continue
obj_name = child.attrib.get("name")
if not obj_name:
i += 1
continue
if i > 0:
prev = children[i - 1]
if prev.tag == "Header" and self._header_already_present(prev, obj_name):
i += 1
continue
header = ET.Element("Header")
header.set("name", allocate_header_name(obj_name))
header.set("value", obj_name)
parent.insert(i, header)
children.insert(i, header)
i += 2
# continue outer loop
return ET.tostring(root, encoding="unicode")
def _build_task_data(
self,
display_text: str,
parsed_payload: Optional[Dict[str, Any]],
label_config: Optional[str],
file_record: DatasetFiles,
dataset_id: str,
file_id: str,
) -> Dict[str, Any]:
data: Dict[str, Any] = dict(parsed_payload or {})
if self._needs_placeholder(data.get(TEXT_DATA_KEY)):
data[TEXT_DATA_KEY] = display_text
data.setdefault(FILE_ID_KEY, file_id)
data.setdefault(DATASET_ID_KEY, dataset_id)
data.setdefault(FILE_NAME_KEY, getattr(file_record, "file_name", ""))
self._apply_text_placeholders(data, label_config)
return data
async def get_project_info(self, project_id: str) -> EditorProjectInfo:
project = await self._get_project_or_404(project_id)
@@ -102,11 +269,7 @@ class AnnotationEditorService:
unsupported_reason = f"当前仅支持 TEXT,项目数据类型为: {dataset_type or 'UNKNOWN'}"
# 优先使用项目配置中的label_config(用户编辑版本),其次使用模板默认配置
label_config = None
if project.configuration and isinstance(project.configuration, dict):
label_config = project.configuration.get("label_config")
if not label_config:
label_config = await self._get_label_config(project.template_id)
label_config = await self._resolve_project_label_config(project)
return EditorProjectInfo(
projectId=project.id,
@@ -198,6 +361,9 @@ class AnnotationEditorService:
raise HTTPException(status_code=404, detail=f"文件不存在或不属于该项目: {file_id}")
text_content = await self._fetch_text_content_via_download_api(project.dataset_id, file_id)
assert isinstance(text_content, str)
label_config = await self._resolve_project_label_config(project)
parsed_payload = self._try_parse_json_payload(text_content)
# 获取现有标注
ann_result = await self.db.execute(
@@ -244,14 +410,17 @@ class AnnotationEditorService:
display_text = raw_segments[current_segment_index]["text"]
# 构造 task 对象
task_data = self._build_task_data(
display_text=display_text,
parsed_payload=parsed_payload,
label_config=label_config,
file_record=file_record,
dataset_id=project.dataset_id,
file_id=file_id,
)
task: Dict[str, Any] = {
"id": ls_task_id,
"data": {
"text": display_text,
"file_id": file_id,
"dataset_id": project.dataset_id,
"file_name": getattr(file_record, "file_name", ""),
},
"data": task_data,
"annotations": [],
}