From ab4523b55669201f49544c5db9b2f7c480d2d866 Mon Sep 17 00:00:00 2001 From: hhhhsc701 <56435672+hhhhsc701@users.noreply.github.com> Date: Fri, 19 Dec 2025 11:54:08 +0800 Subject: [PATCH] add export type settings and enhance metadata structure (#181) * fix(session): enhance database connection settings with pool pre-ping and recycle options * feat(metadata): add export type settings and enhance metadata structure * fix(base_op): improve sample handling by introducing target_type key and consolidating text/data retrieval logic * feat(metadata): add export type settings and enhance metadata structure * feat(metadata): add export type settings and enhance metadata structure --- runtime/datamate-python/app/db/session.py | 4 ++- .../ops/examples/test_operator/metadata.yml | 6 ++--- .../formatter/mineru_formatter/metadata.yml | 12 +++++++++ .../ops/formatter/mineru_formatter/process.py | 26 ++++++++++++++++++- .../python-executor/datamate/core/base_op.py | 18 +++++++++---- scripts/images/runtime/Dockerfile | 2 +- 6 files changed, 57 insertions(+), 11 deletions(-) diff --git a/runtime/datamate-python/app/db/session.py b/runtime/datamate-python/app/db/session.py index c70b196..bd19b1e 100644 --- a/runtime/datamate-python/app/db/session.py +++ b/runtime/datamate-python/app/db/session.py @@ -10,7 +10,9 @@ logger = get_logger(__name__) engine = create_async_engine( settings.database_url, echo=False, # 关闭SQL调试日志以减少输出 - future=True + future=True, + pool_pre_ping=True, + pool_recycle=3600 ) # 创建会话工厂 diff --git a/runtime/ops/examples/test_operator/metadata.yml b/runtime/ops/examples/test_operator/metadata.yml index d4f80fd..088485f 100644 --- a/runtime/ops/examples/test_operator/metadata.yml +++ b/runtime/ops/examples/test_operator/metadata.yml @@ -4,9 +4,9 @@ language: 'python' vendor: 'huawei' raw_id: 'TestMapper' version: '1.0.0' -modal: 'text' -inputs: 'text' -outputs: 'text' +modal: 'text' # text/image/audio/video/multimodal +inputs: 'text' # text/image/audio/video/multimodal +outputs: 'text' # text/image/audio/video/multimodal settings: sliderTest: name: '滑窗测试' diff --git a/runtime/ops/formatter/mineru_formatter/metadata.yml b/runtime/ops/formatter/mineru_formatter/metadata.yml index ac3043b..81af9ec 100644 --- a/runtime/ops/formatter/mineru_formatter/metadata.yml +++ b/runtime/ops/formatter/mineru_formatter/metadata.yml @@ -14,3 +14,15 @@ effect: after: '' inputs: 'text' outputs: 'text' +settings: + exportType: + name: '导出类型' + description: '指定清洗结果文件类型。若指定为md且后续存在其他清洗算子,可能导致文件格式错乱。' + type: 'select' + defaultVal: 'markdown' + required: false + options: + - label: 'markdown' + value: 'md' + - label: 'txt' + value: 'txt' diff --git a/runtime/ops/formatter/mineru_formatter/process.py b/runtime/ops/formatter/mineru_formatter/process.py index 730accf..ffa9356 100644 --- a/runtime/ops/formatter/mineru_formatter/process.py +++ b/runtime/ops/formatter/mineru_formatter/process.py @@ -6,12 +6,16 @@ Description: MinerU PDF文本抽取 Create: 2025/10/29 17:24 """ import asyncio +import glob import os import shutil import time +import uuid +from pathlib import Path from typing import Dict, Any -from datamate.core.base_op import Mapper +from datamate.core.base_op import Mapper, FileExporter +from datamate.sql_manager.persistence_atction import TaskInfoPersistence from loguru import logger from mineru.cli.common import aio_do_parse, read_fn from mineru.cli.fast_api import get_infer_result @@ -27,6 +31,7 @@ class MineruFormatter(Mapper): self.backend = "vlm-http-client" self.output_dir = "/dataset/outputs" self.max_retries = 3 + self.target_type = kwargs.get("exportType", "md") def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() @@ -35,6 +40,7 @@ class MineruFormatter(Mapper): return sample try: sample[self.text_key] = asyncio.run(self.async_process_file(sample)) + sample[self.target_type_key] = self.target_type logger.info( f"fileName: {filename}, method: MineruFormatter costs {(time.time() - start):6f} s") except Exception as e: @@ -77,5 +83,23 @@ class MineruFormatter(Mapper): raise # 耗尽次数后抛出异常,交给上层 execute 处理 if os.path.exists(parse_dir): content += get_infer_result(".md", filename_without_ext, parse_dir) + self.save_images(parse_dir, sample["dataset_id"], os.path.abspath(sample[self.export_path_key]) + "/images") shutil.rmtree(parse_dir) return content + + def save_images(self, parse_dir, dataset_id, export_path): + Path(export_path).mkdir(parents=True, exist_ok=True) + + images_dir = os.path.join(parse_dir, "images") + image_paths = glob.glob(os.path.join(glob.escape(images_dir), "*.jpg")) + for image_path in image_paths: + shutil.copy(image_path, export_path) + image_sample = {} + image = Path(image_path) + image_name = image.name + image_sample[self.filename_key] = image_name + image_sample[self.filetype_key] = "jpg" + image_sample[self.filesize_key] = image.stat().st_size + image_sample["dataset_id"] = dataset_id + image_sample[self.filepath_key] = export_path + "/" + image_name + TaskInfoPersistence().update_file_result(image_sample, str(uuid.uuid4())) \ No newline at end of file diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index 5b5e5c6..bef03ef 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -72,6 +72,7 @@ class BaseOp: self.filesize_key = kwargs.get('fileSize_key', "fileSize") self.export_path_key = kwargs.get('export_path_key', "export_path") self.ext_params_key = kwargs.get('ext_params_key', "ext_params") + self.target_type_key = kwargs.get('target_type_key', "target_type") @property def name(self): @@ -437,7 +438,7 @@ class FileExporter(BaseOp): elif file_type in self.medical_support_ext: sample, save_path = self.get_medicalfile_handler(sample) else: - raise TypeError(f"{file_type} is unsupported! please check support_ext in FileExporter Ops") + return False if sample[self.text_key] == '' and sample[self.data_key] == b'': sample[self.filesize_key] = "0" @@ -483,11 +484,11 @@ class FileExporter(BaseOp): # target_type存在则保存为扫描件, docx格式 if target_type: - sample = self._get_from_data(sample) + sample = self._get_from_text_or_data(sample) save_path = self.get_save_path(sample, target_type) # 不存在则保存为txt文件,正常文本清洗 else: - sample = self._get_from_text(sample) + sample = self._get_from_text_or_data(sample) save_path = self.get_save_path(sample, 'txt') return sample, save_path @@ -496,11 +497,11 @@ class FileExporter(BaseOp): # target_type存在, 图转文保存为target_type,markdown格式 if target_type: - sample = self._get_from_text(sample) + sample = self._get_from_text_or_data(sample) save_path = self.get_save_path(sample, target_type) # 不存在则保存为原本图片文件格式,正常图片清洗 else: - sample = self._get_from_data(sample) + sample = self._get_from_text_or_data(sample) save_path = self.get_save_path(sample, sample[self.filetype_key]) return sample, save_path @@ -533,6 +534,13 @@ class FileExporter(BaseOp): sample[self.text_key] = str(sample[self.text_key]) return sample + def _get_from_text_or_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: + if sample[self.data_key] is not None and sample[self.data_key] != b'': + return self._get_from_data(sample) + else: + return self._get_from_text(sample) + + @staticmethod def _get_uuid(): return str(uuid.uuid4()) diff --git a/scripts/images/runtime/Dockerfile b/scripts/images/runtime/Dockerfile index b70cf58..1af18ab 100644 --- a/scripts/images/runtime/Dockerfile +++ b/scripts/images/runtime/Dockerfile @@ -3,7 +3,7 @@ FROM ghcr.io/astral-sh/uv:python3.11-bookworm RUN --mount=type=cache,target=/var/cache/apt \ --mount=type=cache,target=/var/lib/apt \ apt update \ - && apt install -y libgl1 libglib2.0-0 vim libmagic1 libreoffice dos2unix swig + && apt install -y libgl1 libglib2.0-0 vim libmagic1 libreoffice dos2unix swig poppler-utils tesseract-ocr RUN mkdir -p /home/models \ && wget https://paddleocr.bj.bcebos.com/dygraph_v2.0/ch/ch_ppocr_mobile_v2.0_cls_infer.tar \