From 761f7f6a5199cef3c29be7979bceffdea40fe7a4 Mon Sep 17 00:00:00 2001 From: hhhhsc701 <56435672+hhhhsc701@users.noreply.github.com> Date: Thu, 18 Dec 2025 15:28:30 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20optimize=20PDF=20parsing=20by=20implemen?= =?UTF-8?q?ting=20concurrent=20processing=20with=20=E2=80=A6=20(#177)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: optimize PDF parsing by implementing concurrent processing with ThreadPoolExecutor * Refactor to async processing for file extraction Refactor the file processing to use asyncio for improved performance and concurrency. --- .../ops/formatter/mineru_formatter/process.py | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/runtime/ops/formatter/mineru_formatter/process.py b/runtime/ops/formatter/mineru_formatter/process.py index df30c2a..fcb7508 100644 --- a/runtime/ops/formatter/mineru_formatter/process.py +++ b/runtime/ops/formatter/mineru_formatter/process.py @@ -5,15 +5,15 @@ Description: MinerU PDF文本抽取 Create: 2025/10/29 17:24 """ +import asyncio import os import shutil import time from typing import Dict, Any -from datamate.common.utils.rest_client import http_request from datamate.core.base_op import Mapper from loguru import logger -from mineru.cli.common import do_parse, read_fn +from mineru.cli.common import aio_do_parse, read_fn from mineru.cli.fast_api import get_infer_result from pypdf import PdfReader @@ -30,33 +30,38 @@ class MineruFormatter(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() filename = sample[self.filename_key] - filename_without_ext = os.path.splitext(filename)[0] if not filename.lower().endswith((".png", ".jpeg", ".jpg", ".webp", ".gif", ".pdf")): return sample try: - filepath = sample[self.filepath_key] - parse_dir = os.path.join(self.output_dir, filename_without_ext, "vlm") - pdf_bytes = read_fn(filepath) - total_page = len(PdfReader(filepath).pages) - content = "" - for page in range(0, total_page, 10): - do_parse( - output_dir=self.output_dir, - pdf_file_names=[filename_without_ext], - pdf_bytes_list=[pdf_bytes], - p_lang_list=["ch"], - backend=self.backend, - server_url=self.server_url, - start_page_id=page, - end_page_id=min(page + 9, total_page - 1), - ) - if os.path.exists(parse_dir): - content += get_infer_result(".md", filename_without_ext, parse_dir) - shutil.rmtree(parse_dir) - sample[self.text_key] = content + sample[self.text_key] = asyncio.run(self.async_process_file(sample)) logger.info( f"fileName: {filename}, method: MineruFormatter costs {(time.time() - start):6f} s") except Exception as e: logger.exception(f"fileName: {filename}, method: MineruFormatter causes error: {e}") raise return sample + + async def async_process_file(self, sample): + filename = sample[self.filename_key] + filename_without_ext = os.path.splitext(filename)[0] + filepath = sample[self.filepath_key] + parse_dir = os.path.join(self.output_dir, filename_without_ext, "vlm") + pdf_bytes = read_fn(filepath) + total_page = len(PdfReader(filepath).pages) + content = "" + for page in range(0, total_page, 10): + logger.info(f"fileName: {filename}, total_page: {total_page}, page: {page}.") + await aio_do_parse( + output_dir=self.output_dir, + pdf_file_names=[filename_without_ext], + pdf_bytes_list=[pdf_bytes], + p_lang_list=["ch"], + backend=self.backend, + server_url=self.server_url, + start_page_id=page, + end_page_id=min(page + 9, total_page - 1), + ) + if os.path.exists(parse_dir): + content += get_infer_result(".md", filename_without_ext, parse_dir) + shutil.rmtree(parse_dir) + return content