From f707ce9dae1cd0873cbb368f5c7e8a5dcb66de78 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Tue, 10 Feb 2026 16:49:37 +0800 Subject: [PATCH] feat(auto-annotation): add batch progress updates to reduce DB write pressure Throttle progress updates to reduce database write operations during large dataset processing. Key features: - Add PROGRESS_UPDATE_INTERVAL config (default 2.0s, configurable via AUTO_ANNOTATION_PROGRESS_INTERVAL env) - Conditional progress updates: Only write to DB when (now - last_update) >= interval - Use time.monotonic() for timing (immune to system clock adjustments) - Final status updates (completed/stopped/failed) always execute (not throttled) Implementation: - Initialize last_progress_update timestamp before as_completed() loop - Replace unconditional _update_task_status() with conditional call based on time interval - Update docstring to reflect throttling capability Performance impact (T=2s): - 1,000 files / 100s processing: DB writes reduced from 1,000 to ~50 (95% reduction) - 10,000 files / 500s processing: DB writes reduced from 10,000 to ~250 (97.5% reduction) - Small datasets (10 files): Minimal difference Backward compatibility: - PROGRESS_UPDATE_INTERVAL=0: Updates every file (identical to previous behavior) - Heartbeat mechanism unaffected (2s interval << 300s timeout) - Stop check mechanism independent of progress updates - Final status updates always execute Testing: - 14 unit tests all passed (11 existing + 3 new): * Fast processing with throttling * PROGRESS_UPDATE_INTERVAL=0 updates every file * Slow processing (per-file > T) updates every file - py_compile syntax check passed Edge cases handled: - Single file task: Works normally - Very slow processing: Degrades to per-file updates - Concurrent FILE_WORKERS > 1: Counters accurate (lock-protected), DB reflects with max T seconds delay --- .../datamate/auto_annotation_worker.py | 32 +++++---- .../tests/test_worker_concurrency.py | 66 +++++++++++++++++++ 2 files changed, 86 insertions(+), 12 deletions(-) diff --git a/runtime/python-executor/datamate/auto_annotation_worker.py b/runtime/python-executor/datamate/auto_annotation_worker.py index 7bd4f64..6972e3f 100644 --- a/runtime/python-executor/datamate/auto_annotation_worker.py +++ b/runtime/python-executor/datamate/auto_annotation_worker.py @@ -13,6 +13,8 @@ the datamate-python backend and frontend can display real-time status. - 任务内文件并发: 可通过 AUTO_ANNOTATION_FILE_WORKERS 配置线程池大小, 单任务内并行处理多个文件(LLM I/O 密集型场景尤其有效)。 算子链通过对象池隔离,每个线程使用独立的链实例。 +- 进度更新节流: 可通过 AUTO_ANNOTATION_PROGRESS_INTERVAL 控制进度写入频率, + 避免大数据集每文件都写 DB 造成的写压力(默认 2 秒间隔)。 - 启动时自动恢复心跳超时的 running 任务:未处理文件重置为 pending, 已有部分进度的标记为 failed,由用户决定是否手动重试。 """ @@ -133,6 +135,8 @@ WORKER_COUNT = int(os.getenv("AUTO_ANNOTATION_WORKER_COUNT", "1")) FILE_WORKERS = int(os.getenv("AUTO_ANNOTATION_FILE_WORKERS", "1")) +PROGRESS_UPDATE_INTERVAL = float(os.getenv("AUTO_ANNOTATION_PROGRESS_INTERVAL", "2.0")) + def _recover_stale_running_tasks() -> int: """启动时恢复心跳超时的 running 任务。 @@ -1274,6 +1278,7 @@ def _process_single_task(task: Dict[str, Any]) -> None: # --- 并发文件处理 --- stop_check_interval = max(1, effective_file_workers * 2) completed_since_check = 0 + last_progress_update = time.monotonic() with ThreadPoolExecutor(max_workers=effective_file_workers) as executor: future_to_file = { @@ -1299,18 +1304,21 @@ def _process_single_task(task: Dict[str, Any]) -> None: current_processed = processed current_detected = detected_total - progress = int(current_processed * 100 / total_images) if total_images > 0 else 100 - _update_task_status( - task_id, - run_token=run_token, - status="running", - progress=progress, - processed_images=current_processed, - detected_objects=current_detected, - total_images=total_images, - output_path=output_dir, - output_dataset_id=output_dataset_id, - ) + now = time.monotonic() + if PROGRESS_UPDATE_INTERVAL <= 0 or (now - last_progress_update) >= PROGRESS_UPDATE_INTERVAL: + progress = int(current_processed * 100 / total_images) if total_images > 0 else 100 + _update_task_status( + task_id, + run_token=run_token, + status="running", + progress=progress, + processed_images=current_processed, + detected_objects=current_detected, + total_images=total_images, + output_path=output_dir, + output_dataset_id=output_dataset_id, + ) + last_progress_update = now except Exception as e: logger.error( "Failed to process file for task {}: file_path={}, error={}", diff --git a/runtime/python-executor/tests/test_worker_concurrency.py b/runtime/python-executor/tests/test_worker_concurrency.py index ecbe73e..68afc04 100644 --- a/runtime/python-executor/tests/test_worker_concurrency.py +++ b/runtime/python-executor/tests/test_worker_concurrency.py @@ -320,5 +320,71 @@ class TestWorkerLoopSimplified(unittest.TestCase): mock_recover.assert_not_called() +class TestProgressThrottling(unittest.TestCase): + """Test time-based progress update throttling (improvement #5).""" + + def test_progress_updates_throttled(self): + """With PROGRESS_UPDATE_INTERVAL>0, rapid completions should batch DB writes.""" + update_calls: List[float] = [] + lock = threading.Lock() + + def mock_update(*args, **kwargs): + with lock: + update_calls.append(time.monotonic()) + + interval = 0.05 # 50ms throttle interval + processed = 0 + # Initialize in the past so the first file triggers an update + last_progress_update = time.monotonic() - interval + total_files = 50 + + # Simulate the throttled update loop from _process_single_task + for i in range(total_files): + processed += 1 + now = time.monotonic() + if interval <= 0 or (now - last_progress_update) >= interval: + mock_update(processed=processed, total=total_files) + last_progress_update = now + # Simulate very fast file processing (~1ms) + time.sleep(0.001) + + # With 50 files at ~1ms each (~50ms total) and 50ms interval, + # should get far fewer updates than total_files + self.assertLess(len(update_calls), total_files) + self.assertGreater(len(update_calls), 0) + + def test_progress_interval_zero_updates_every_file(self): + """PROGRESS_UPDATE_INTERVAL=0 should update on every file completion.""" + update_count = 0 + interval = 0.0 + total_files = 20 + last_progress_update = time.monotonic() + + for i in range(total_files): + now = time.monotonic() + if interval <= 0 or (now - last_progress_update) >= interval: + update_count += 1 + last_progress_update = now + + self.assertEqual(update_count, total_files) + + def test_progress_throttle_with_slow_processing(self): + """When each file takes longer than the interval, every file triggers an update.""" + update_count = 0 + interval = 0.01 # 10ms interval + total_files = 5 + last_progress_update = time.monotonic() - 1.0 # Start in the past + + for i in range(total_files): + time.sleep(0.02) # 20ms per file > 10ms interval + now = time.monotonic() + if interval <= 0 or (now - last_progress_update) >= interval: + update_count += 1 + last_progress_update = now + + # Every file should trigger an update since processing time > interval + self.assertEqual(update_count, total_files) + + if __name__ == "__main__": unittest.main()