feat(annotation): 添加 JSONL 文件支持和文本键解析功能

- 添加 JSONL 文件扩展名常量定义
- 实现主文本键解析方法 _resolve_primary_text_key
- 添加 JSONL 记录解析方法 _parse_jsonl_records
- 修改任务数据构建方法以支持主文本键参数
- 实现主文本值解析方法 _resolve_primary_text_value
- 更新项目信息获取逻辑以支持 JSONL 多行记录处理
- 修改分段逻辑以支持 JSONL 多行或超长文本分段
- 调整标注处理逻辑以正确处理分段标注场景
This commit is contained in:
2026-01-23 22:04:34 +08:00
parent 8e06a36460
commit 0e30e658e9
3 changed files with 118 additions and 31 deletions

View File

@@ -48,6 +48,7 @@ FILE_ID_CAMEL_KEY = "fileId"
FILE_NAME_CAMEL_KEY = "fileName"
SEGMENT_INDEX_KEY = "segment_index"
SEGMENT_INDEX_CAMEL_KEY = "segmentIndex"
JSONL_EXTENSION = ".jsonl"
TEXTUAL_OBJECT_CATEGORIES = {"text", "document"}
OBJECT_NAME_HEADER_PREFIX = "dm_object_header_"
@@ -117,6 +118,17 @@ class AnnotationEditorService:
label_config = self._decorate_label_config_for_editor(label_config)
return label_config
@classmethod
def _resolve_primary_text_key(cls, label_config: Optional[str]) -> Optional[str]:
if not label_config:
return None
keys = cls._extract_textual_value_keys(label_config)
if not keys:
return None
if TEXT_DATA_KEY in keys:
return TEXT_DATA_KEY
return keys[0]
@staticmethod
def _try_parse_json_payload(text_content: str) -> Optional[Dict[str, Any]]:
if not text_content:
@@ -132,6 +144,15 @@ class AnnotationEditorService:
return None
return parsed if isinstance(parsed, dict) else None
@classmethod
def _parse_jsonl_records(cls, text_content: str) -> List[Tuple[Optional[Dict[str, Any]], str]]:
lines = [line for line in text_content.splitlines() if line.strip()]
records: List[Tuple[Optional[Dict[str, Any]], str]] = []
for line in lines:
payload = cls._try_parse_json_payload(line)
records.append((payload, line))
return records
@staticmethod
def _is_textual_object_tag(object_tag: str) -> bool:
config = LabelStudioTagConfig.get_object_config(object_tag) or {}
@@ -252,10 +273,11 @@ class AnnotationEditorService:
file_record: DatasetFiles,
dataset_id: str,
file_id: str,
primary_text_key: Optional[str],
) -> Dict[str, Any]:
data: Dict[str, Any] = dict(parsed_payload or {})
if self._needs_placeholder(data.get(TEXT_DATA_KEY)):
data[TEXT_DATA_KEY] = display_text
text_key = primary_text_key or TEXT_DATA_KEY
data[text_key] = display_text
file_name = str(getattr(file_record, "file_name", ""))
data[FILE_ID_KEY] = file_id
@@ -268,6 +290,23 @@ class AnnotationEditorService:
self._apply_text_placeholders(data, label_config)
return data
@classmethod
def _resolve_primary_text_value(
cls,
parsed_payload: Optional[Dict[str, Any]],
raw_text: str,
primary_text_key: Optional[str],
) -> str:
if parsed_payload and primary_text_key:
value = parsed_payload.get(primary_text_key)
if isinstance(value, str) and value.strip():
return value
if parsed_payload and not primary_text_key:
value = parsed_payload.get(TEXT_DATA_KEY)
if isinstance(value, str) and value.strip():
return value
return raw_text
async def get_project_info(self, project_id: str) -> EditorProjectInfo:
project = await self._get_project_or_404(project_id)
@@ -372,7 +411,25 @@ class AnnotationEditorService:
text_content = await self._fetch_text_content_via_download_api(project.dataset_id, file_id)
assert isinstance(text_content, str)
label_config = await self._resolve_project_label_config(project)
parsed_payload = self._try_parse_json_payload(text_content)
primary_text_key = self._resolve_primary_text_key(label_config)
file_name = str(getattr(file_record, "file_name", "")).lower()
records: List[Tuple[Optional[Dict[str, Any]], str]] = []
if file_name.endswith(JSONL_EXTENSION):
records = self._parse_jsonl_records(text_content)
else:
parsed_payload = self._try_parse_json_payload(text_content)
if parsed_payload:
records = [(parsed_payload, text_content)]
if not records:
records = [(None, text_content)]
record_texts = [
self._resolve_primary_text_value(payload, raw_text, primary_text_key)
for payload, raw_text in records
]
if not record_texts:
record_texts = [text_content]
# 获取现有标注
ann_result = await self.db.execute(
@@ -385,47 +442,73 @@ class AnnotationEditorService:
ls_task_id = self._make_ls_task_id(project_id, file_id)
# 判断是否需要分段
needs_segmentation = len(text_content) > self.SEGMENT_THRESHOLD
# 判断是否需要分段(JSONL 多行或主文本超过阈值)
needs_segmentation = len(records) > 1 or any(
len(text or "") > self.SEGMENT_THRESHOLD for text in record_texts
)
segments: Optional[List[SegmentInfo]] = None
current_segment_index = 0
display_text = text_content
display_text = record_texts[0] if record_texts else text_content
selected_payload = records[0][0] if records else None
segment_annotations: Dict[str, Any] = {}
if ann and ann.annotation and ann.annotation.get("segmented"):
segment_annotations = ann.annotation.get("segments", {})
if needs_segmentation:
splitter = AnnotationTextSplitter(max_chars=self.SEGMENT_THRESHOLD)
raw_segments = splitter.split(text_content)
current_segment_index = segment_index if segment_index is not None else 0
segment_contexts: List[Tuple[Optional[Dict[str, Any]], str, str, int, int]] = []
segments = []
segment_cursor = 0
# 校验段落索引
if current_segment_index < 0 or current_segment_index >= len(raw_segments):
for record_index, ((payload, raw_text), record_text) in enumerate(zip(records, record_texts)):
normalized_text = record_text or ""
if len(normalized_text) > self.SEGMENT_THRESHOLD:
raw_segments = splitter.split(normalized_text)
for chunk_index, seg in enumerate(raw_segments):
segments.append(SegmentInfo(
idx=segment_cursor,
text=seg["text"],
start=seg["start"],
end=seg["end"],
hasAnnotation=str(segment_cursor) in segment_annotations,
lineIndex=record_index,
chunkIndex=chunk_index,
))
segment_contexts.append((payload, raw_text, seg["text"], record_index, chunk_index))
segment_cursor += 1
else:
segments.append(SegmentInfo(
idx=segment_cursor,
text=normalized_text,
start=0,
end=len(normalized_text),
hasAnnotation=str(segment_cursor) in segment_annotations,
lineIndex=record_index,
chunkIndex=0,
))
segment_contexts.append((payload, raw_text, normalized_text, record_index, 0))
segment_cursor += 1
if not segments:
segments = [SegmentInfo(idx=0, text="", start=0, end=0, hasAnnotation=False, lineIndex=0, chunkIndex=0)]
segment_contexts = [(None, "", "", 0, 0)]
current_segment_index = segment_index if segment_index is not None else 0
if current_segment_index < 0 or current_segment_index >= len(segments):
current_segment_index = 0
# 标记每个段落是否已有标注
segment_annotations: Dict[str, Any] = {}
if ann and ann.annotation and ann.annotation.get("segmented"):
segment_annotations = ann.annotation.get("segments", {})
segments = []
for seg in raw_segments:
segments.append(SegmentInfo(
idx=seg["idx"],
text=seg["text"],
start=seg["start"],
end=seg["end"],
hasAnnotation=str(seg["idx"]) in segment_annotations,
))
# 当前段落文本用于 task.data.text
display_text = raw_segments[current_segment_index]["text"]
selected_payload, _, display_text, _, _ = segment_contexts[current_segment_index]
# 构造 task 对象
task_data = self._build_task_data(
display_text=display_text,
parsed_payload=parsed_payload,
parsed_payload=selected_payload,
label_config=label_config,
file_record=file_record,
dataset_id=project.dataset_id,
file_id=file_id,
primary_text_key=primary_text_key,
)
if needs_segmentation:
task_data[SEGMENT_INDEX_KEY] = current_segment_index
@@ -453,7 +536,7 @@ class AnnotationEditorService:
"updated_at": seg_ann.get("updated_at", datetime.utcnow().isoformat() + "Z"),
}
task["annotations"] = [stored]
elif not needs_segmentation:
elif not needs_segmentation and not (ann.annotation or {}).get("segmented"):
# 非分段模式:直接返回存储的 annotation 原始对象
stored = dict(ann.annotation or {})
stored["task"] = ls_task_id