""" 标注迁移器 在文件版本更新时,将旧版本的标注结果迁移到新版本文本上。 仅适用于 TEXT 类型数据集(标注含有 start/end 字符位置和 text 文本片段)。 迁移算法: 1. 对没有 value.start/value.end 的标注项(如 choices),直接保留不变 2. 对有位置信息的标注项: a. 用 SequenceMatcher 计算旧位置 -> 新位置的偏移映射 b. 验证映射后的文本是否匹配 c. 若不匹配,全文搜索最近的匹配位置 d. 若仍找不到,记入失败列表 """ from __future__ import annotations import difflib from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional @dataclass class MigrationResult: """标注迁移结果""" migrated: List[Dict[str, Any]] = field(default_factory=list) failed: List[Dict[str, Any]] = field(default_factory=list) total: int = 0 migrated_count: int = 0 failed_count: int = 0 class AnnotationMigrator: """标注迁移核心算法""" @staticmethod def migrate_annotation_results( old_text: str, new_text: str, results: List[Dict[str, Any]], ) -> MigrationResult: """ 迁移标注结果列表中包含位置信息的标注项。 Args: old_text: 旧版本文本 new_text: 新版本文本 results: Label Studio annotation result 数组 Returns: MigrationResult 包含成功/失败的标注项 """ if not results: return MigrationResult() offset_map = AnnotationMigrator._build_offset_map(old_text, new_text) migrated: List[Dict[str, Any]] = [] failed: List[Dict[str, Any]] = [] for item in results: value = item.get("value") if isinstance(item, dict) else None if not isinstance(value, dict): # 无 value 结构,直接保留 migrated.append(item) continue old_start = value.get("start") old_end = value.get("end") if old_start is None or old_end is None: # 无位置信息(如 choices 类型),直接保留 migrated.append(item) continue if not isinstance(old_start, (int, float)) or not isinstance( old_end, (int, float) ): migrated.append(item) continue old_start = int(old_start) old_end = int(old_end) target_text = value.get("text", "") # 尝试通过偏移映射迁移 new_start = offset_map(old_start) new_end = offset_map(old_end) if new_start is not None and new_end is not None: new_start = int(new_start) new_end = int(new_end) if ( 0 <= new_start <= new_end <= len(new_text) and new_text[new_start:new_end] == target_text ): # 偏移映射成功且文本匹配 new_item = _deep_copy_item(item) new_item["value"] = dict(value) new_item["value"]["start"] = new_start new_item["value"]["end"] = new_end migrated.append(new_item) continue # 偏移映射失败或文本不匹配,尝试全文搜索 if target_text: hint_pos = new_start if new_start is not None else old_start found_pos = AnnotationMigrator._find_nearest_occurrence( new_text, target_text, hint_pos ) if found_pos is not None: new_item = _deep_copy_item(item) new_item["value"] = dict(value) new_item["value"]["start"] = found_pos new_item["value"]["end"] = found_pos + len(target_text) migrated.append(new_item) continue # 无法迁移 failed.append(item) total = len(results) return MigrationResult( migrated=migrated, failed=failed, total=total, migrated_count=len(migrated), failed_count=len(failed), ) @staticmethod def _build_offset_map( old_text: str, new_text: str ) -> Callable[[int], Optional[int]]: """ 用 difflib.SequenceMatcher 构建旧位置 -> 新位置映射函数。 对于旧文本中的每个字符位置,通过匹配块计算其在新文本中的对应位置。 """ matcher = difflib.SequenceMatcher(None, old_text, new_text, autojunk=False) matching_blocks = matcher.get_matching_blocks() # 构建映射表:对每个匹配块,旧位置 i 映射到新位置 j + (i - a) # matching_blocks 中每个元素为 (a, b, size),表示 # old_text[a:a+size] == new_text[b:b+size] blocks = [ (a, b, size) for a, b, size in matching_blocks if size > 0 ] def map_position(old_pos: int) -> Optional[int]: for a, b, size in blocks: if a <= old_pos < a + size: return b + (old_pos - a) # 位置不在任何匹配块中,尝试找最近的块进行推算 if not blocks: return None # 找到最近的匹配块 best_block = None best_distance = float("inf") for a, b, size in blocks: # 到块起始位置的距离 dist_start = abs(old_pos - a) dist_end = abs(old_pos - (a + size)) dist = min(dist_start, dist_end) if dist < best_distance: best_distance = dist best_block = (a, b, size) if best_block is None: return None a, b, size = best_block # 推算偏移 offset = old_pos - a new_pos = b + offset if 0 <= new_pos <= len(new_text): return new_pos return None return map_position @staticmethod def _find_nearest_occurrence( text: str, target: str, hint_pos: int ) -> Optional[int]: """ 在 text 中查找 target,优先返回距离 hint_pos 最近的位置。 """ if not target: return None positions: List[int] = [] start = 0 while True: idx = text.find(target, start) if idx < 0: break positions.append(idx) start = idx + 1 if not positions: return None # 返回距离 hint_pos 最近的位置 return min(positions, key=lambda pos: abs(pos - hint_pos)) def _deep_copy_item(item: Dict[str, Any]) -> Dict[str, Any]: """浅拷贝标注项,深拷贝 value 字段""" new_item = dict(item) if "value" in new_item and isinstance(new_item["value"], dict): new_item["value"] = dict(new_item["value"]) return new_item