You've already forked DataMate
新增 AnnotationMigrator 迁移算法,在 TEXT 类型数据集的文件版本更新时, 可选通过 difflib 位置偏移映射和文字二次匹配将旧版本标注迁移到新版本上。 前端版本切换对话框增加"保留标注"复选框(仅 TEXT 类型显示),后端 API 增加 preserveAnnotations 参数,完全向后兼容。 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
216 lines
7.1 KiB
Python
216 lines
7.1 KiB
Python
"""
|
|
标注迁移器
|
|
|
|
在文件版本更新时,将旧版本的标注结果迁移到新版本文本上。
|
|
仅适用于 TEXT 类型数据集(标注含有 start/end 字符位置和 text 文本片段)。
|
|
|
|
迁移算法:
|
|
1. 对没有 value.start/value.end 的标注项(如 choices),直接保留不变
|
|
2. 对有位置信息的标注项:
|
|
a. 用 SequenceMatcher 计算旧位置 -> 新位置的偏移映射
|
|
b. 验证映射后的文本是否匹配
|
|
c. 若不匹配,全文搜索最近的匹配位置
|
|
d. 若仍找不到,记入失败列表
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import difflib
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
|
|
@dataclass
|
|
class MigrationResult:
|
|
"""标注迁移结果"""
|
|
|
|
migrated: List[Dict[str, Any]] = field(default_factory=list)
|
|
failed: List[Dict[str, Any]] = field(default_factory=list)
|
|
total: int = 0
|
|
migrated_count: int = 0
|
|
failed_count: int = 0
|
|
|
|
|
|
class AnnotationMigrator:
|
|
"""标注迁移核心算法"""
|
|
|
|
@staticmethod
|
|
def migrate_annotation_results(
|
|
old_text: str,
|
|
new_text: str,
|
|
results: List[Dict[str, Any]],
|
|
) -> MigrationResult:
|
|
"""
|
|
迁移标注结果列表中包含位置信息的标注项。
|
|
|
|
Args:
|
|
old_text: 旧版本文本
|
|
new_text: 新版本文本
|
|
results: Label Studio annotation result 数组
|
|
|
|
Returns:
|
|
MigrationResult 包含成功/失败的标注项
|
|
"""
|
|
if not results:
|
|
return MigrationResult()
|
|
|
|
offset_map = AnnotationMigrator._build_offset_map(old_text, new_text)
|
|
|
|
migrated: List[Dict[str, Any]] = []
|
|
failed: List[Dict[str, Any]] = []
|
|
|
|
for item in results:
|
|
value = item.get("value") if isinstance(item, dict) else None
|
|
if not isinstance(value, dict):
|
|
# 无 value 结构,直接保留
|
|
migrated.append(item)
|
|
continue
|
|
|
|
old_start = value.get("start")
|
|
old_end = value.get("end")
|
|
|
|
if old_start is None or old_end is None:
|
|
# 无位置信息(如 choices 类型),直接保留
|
|
migrated.append(item)
|
|
continue
|
|
|
|
if not isinstance(old_start, (int, float)) or not isinstance(
|
|
old_end, (int, float)
|
|
):
|
|
migrated.append(item)
|
|
continue
|
|
|
|
old_start = int(old_start)
|
|
old_end = int(old_end)
|
|
target_text = value.get("text", "")
|
|
|
|
# 尝试通过偏移映射迁移
|
|
new_start = offset_map(old_start)
|
|
new_end = offset_map(old_end)
|
|
|
|
if new_start is not None and new_end is not None:
|
|
new_start = int(new_start)
|
|
new_end = int(new_end)
|
|
if (
|
|
0 <= new_start <= new_end <= len(new_text)
|
|
and new_text[new_start:new_end] == target_text
|
|
):
|
|
# 偏移映射成功且文本匹配
|
|
new_item = _deep_copy_item(item)
|
|
new_item["value"] = dict(value)
|
|
new_item["value"]["start"] = new_start
|
|
new_item["value"]["end"] = new_end
|
|
migrated.append(new_item)
|
|
continue
|
|
|
|
# 偏移映射失败或文本不匹配,尝试全文搜索
|
|
if target_text:
|
|
hint_pos = new_start if new_start is not None else old_start
|
|
found_pos = AnnotationMigrator._find_nearest_occurrence(
|
|
new_text, target_text, hint_pos
|
|
)
|
|
if found_pos is not None:
|
|
new_item = _deep_copy_item(item)
|
|
new_item["value"] = dict(value)
|
|
new_item["value"]["start"] = found_pos
|
|
new_item["value"]["end"] = found_pos + len(target_text)
|
|
migrated.append(new_item)
|
|
continue
|
|
|
|
# 无法迁移
|
|
failed.append(item)
|
|
|
|
total = len(results)
|
|
return MigrationResult(
|
|
migrated=migrated,
|
|
failed=failed,
|
|
total=total,
|
|
migrated_count=len(migrated),
|
|
failed_count=len(failed),
|
|
)
|
|
|
|
@staticmethod
|
|
def _build_offset_map(
|
|
old_text: str, new_text: str
|
|
) -> Callable[[int], Optional[int]]:
|
|
"""
|
|
用 difflib.SequenceMatcher 构建旧位置 -> 新位置映射函数。
|
|
|
|
对于旧文本中的每个字符位置,通过匹配块计算其在新文本中的对应位置。
|
|
"""
|
|
matcher = difflib.SequenceMatcher(None, old_text, new_text, autojunk=False)
|
|
matching_blocks = matcher.get_matching_blocks()
|
|
|
|
# 构建映射表:对每个匹配块,旧位置 i 映射到新位置 j + (i - a)
|
|
# matching_blocks 中每个元素为 (a, b, size),表示
|
|
# old_text[a:a+size] == new_text[b:b+size]
|
|
blocks = [
|
|
(a, b, size) for a, b, size in matching_blocks if size > 0
|
|
]
|
|
|
|
def map_position(old_pos: int) -> Optional[int]:
|
|
for a, b, size in blocks:
|
|
if a <= old_pos < a + size:
|
|
return b + (old_pos - a)
|
|
# 位置不在任何匹配块中,尝试找最近的块进行推算
|
|
if not blocks:
|
|
return None
|
|
|
|
# 找到最近的匹配块
|
|
best_block = None
|
|
best_distance = float("inf")
|
|
for a, b, size in blocks:
|
|
# 到块起始位置的距离
|
|
dist_start = abs(old_pos - a)
|
|
dist_end = abs(old_pos - (a + size))
|
|
dist = min(dist_start, dist_end)
|
|
if dist < best_distance:
|
|
best_distance = dist
|
|
best_block = (a, b, size)
|
|
|
|
if best_block is None:
|
|
return None
|
|
|
|
a, b, size = best_block
|
|
# 推算偏移
|
|
offset = old_pos - a
|
|
new_pos = b + offset
|
|
if 0 <= new_pos <= len(new_text):
|
|
return new_pos
|
|
return None
|
|
|
|
return map_position
|
|
|
|
@staticmethod
|
|
def _find_nearest_occurrence(
|
|
text: str, target: str, hint_pos: int
|
|
) -> Optional[int]:
|
|
"""
|
|
在 text 中查找 target,优先返回距离 hint_pos 最近的位置。
|
|
"""
|
|
if not target:
|
|
return None
|
|
|
|
positions: List[int] = []
|
|
start = 0
|
|
while True:
|
|
idx = text.find(target, start)
|
|
if idx < 0:
|
|
break
|
|
positions.append(idx)
|
|
start = idx + 1
|
|
|
|
if not positions:
|
|
return None
|
|
|
|
# 返回距离 hint_pos 最近的位置
|
|
return min(positions, key=lambda pos: abs(pos - hint_pos))
|
|
|
|
|
|
def _deep_copy_item(item: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""浅拷贝标注项,深拷贝 value 字段"""
|
|
new_item = dict(item)
|
|
if "value" in new_item and isinstance(new_item["value"], dict):
|
|
new_item["value"] = dict(new_item["value"])
|
|
return new_item
|