diff --git a/Makefile b/Makefile index 3a33206..cf0bca7 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ MAKEFLAGS += --no-print-directory +WITH_MINERU ?= false # 默认不构建mineru VERSION ?= latest NAMESPACE ?= datamate @@ -8,7 +9,7 @@ build-%: $(MAKE) $*-docker-build .PHONY: build -build: backend-docker-build frontend-docker-build runtime-docker-build +build: backend-docker-build frontend-docker-build runtime-docker-build $(if $(WITH_MINERU),mineru-docker-build) .PHONY: create-namespace create-namespace: @@ -85,6 +86,9 @@ deer-flow-docker-build: cp deployment/docker/deer-flow/conf.yaml.example ../deer-flow/conf.yaml cd ../deer-flow && docker compose build +.PHONY: mineru-docker-build +mineru-docker-build: + docker build -t datamate-mineru:$(VERSION) . -f scripts/images/mineru/Dockerfile .PHONY: backend-docker-install backend-docker-install: cd deployment/docker/datamate && docker compose up -d backend @@ -109,6 +113,22 @@ runtime-docker-install: runtime-docker-uninstall: cd deployment/docker/datamate && docker compose down runtime +.PHONY: mineru-docker-install +mineru-docker-install: + cd deployment/docker/datamate && cp .env.example .env && docker compose up -d datamate-mineru + +.PHONY: mineru-docker-uninstall +mineru-docker-uninstall: + cd deployment/docker/datamate && docker compose down datamate-mineru + +.PHONY: mineru-k8s-install +mineru-k8s-install: create-namespace + kubectl apply -f deployment/kubernetes/mineru/deploy.yaml -n $(NAMESPACE) + +.PHONY: mineru-k8s-uninstall +mineru-k8s-uninstall: + kubectl delete -f deployment/kubernetes/mineru/deploy.yaml -n $(NAMESPACE) + .PHONY: datamate-docker-install datamate-docker-install: cd deployment/docker/datamate && cp .env.example .env && docker compose -f docker-compose.yml up -d diff --git a/deployment/docker/datamate/docker-compose.yml b/deployment/docker/datamate/docker-compose.yml index 623fa7d..98529fd 100644 --- a/deployment/docker/datamate/docker-compose.yml +++ b/deployment/docker/datamate/docker-compose.yml @@ -60,6 +60,7 @@ services: MYSQL_USER: "root" MYSQL_PASSWORD: "password" MYSQL_DATABASE: "datamate" + PDF_FORMATTER_BASE_URL: "http://datamate-mineru:9001" command: - python - /opt/runtime/datamate/operator_runtime.py @@ -72,6 +73,27 @@ services: - flow_volume:/flow networks: [ datamate ] + # 4) mineru + datamate-mineru: + container_name: datamate-mineru + image: datamate-mineru + restart: on-failure + environment: + MINERU_MODEL_SOURCE: local + MINERU_DEVICE_MODE: cpu # cpu|cuda|npu|mps + MINERU_BACKEND_MODE: pipeline + privileged: true + command: + - python + - /opt/runtime/datamate/mineru/mineru_api.py + - --port + - "9001" + volumes: + - dataset_volume:/dataset + - mineru_log_volume:/var/log/datamate/mineru + networks: [ datamate ] + profiles: [ mineru ] + volumes: dataset_volume: name: datamate-dataset-volume diff --git a/deployment/helm/datamate/charts/ray-cluster/values.yaml b/deployment/helm/datamate/charts/ray-cluster/values.yaml index b17a132..49278db 100644 --- a/deployment/helm/datamate/charts/ray-cluster/values.yaml +++ b/deployment/helm/datamate/charts/ray-cluster/values.yaml @@ -77,6 +77,8 @@ head: value: "password" - name: MYSQL_DATABASE value: "datamate" + - name: PDF_FORMATTER_BASE_URL + value: "http://datamate-mineru:9001" # - name: EXAMPLE_ENV # value: "1" envFrom: [] @@ -154,6 +156,8 @@ head: value: "password" - name: MYSQL_DATABASE value: "datamate" + - name: PDF_FORMATTER_BASE_URL + value: "http://datamate-mineru:9001" ports: - containerPort: 8081 volumeMounts: @@ -221,6 +225,8 @@ worker: value: "password" - name: MYSQL_DATABASE value: "datamate" + - name: PDF_FORMATTER_BASE_URL + value: "http://datamate-mineru:9001" # - name: EXAMPLE_ENV # value: "1" envFrom: [] diff --git a/deployment/kubernetes/mineru/deploy.yaml b/deployment/kubernetes/mineru/deploy.yaml new file mode 100644 index 0000000..fa8447c --- /dev/null +++ b/deployment/kubernetes/mineru/deploy.yaml @@ -0,0 +1,70 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: datamate-mineru + labels: + app: datamate + tier: mineru +spec: + replicas: 1 + selector: + matchLabels: + app: datamate + tier: mineru + template: + metadata: + labels: + app: datamate + tier: mineru + spec: + containers: + - name: mineru + image: datamate-mineru + imagePullPolicy: IfNotPresent + command: + - python + - /opt/runtime/datamate/mineru/mineru_api.py + - --port + - "9001" + env: + - name: MINERU_MODEL_SOURCE + value: local + - name: MINERU_DEVICE_MODE + value: cpu + - name: MINERU_BACKEND_MODE + value: pipeline + ports: + - containerPort: 9001 + volumeMounts: + - name: dataset-volume + mountPath: /dataset + - name: log-volume + mountPath: /var/log/datamate/mineru + subPath: mineru + volumes: + - name: dataset-volume + hostPath: + path: /opt/datamate/data/dataset + type: DirectoryOrCreate + - name: log-volume + hostPath: + path: /opt/datamate/data/log + type: DirectoryOrCreate + +--- +apiVersion: v1 +kind: Service +metadata: + name: datamate-mineru + labels: + app: datamate + tier: mineru +spec: + type: ClusterIP + ports: + - port: 9001 + targetPort: 9001 + protocol: TCP + selector: + app: datamate + tier: mineru diff --git a/runtime/mineru/mineru_api.py b/runtime/mineru/mineru_api.py new file mode 100644 index 0000000..5ad9c18 --- /dev/null +++ b/runtime/mineru/mineru_api.py @@ -0,0 +1,112 @@ +import shutil +import time +import uuid +import os + +import click +import uvicorn +from pydantic import BaseModel +from pathlib import Path +from fastapi import FastAPI +from fastapi.responses import JSONResponse +from loguru import logger +from mineru.cli.common import aio_do_parse, read_fn +from mineru.cli.fast_api import get_infer_result + +# 日志配置 +LOG_DIR = "/var/log/datamate/mineru" +os.makedirs(LOG_DIR, exist_ok=True) +logger.add( + f"{LOG_DIR}/mineru.log", + format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}", + level="DEBUG", + enqueue=True +) + +app = FastAPI() +class PDFParseRequest(BaseModel): + source_path: str + export_path: str + +@app.post(path="/api/pdf-extract") +async def parse_pdf(request: PDFParseRequest): + try: + start = time.time() + # 创建唯一的输出目录 + unique_id = str(uuid.uuid4()) + unique_dir = os.path.join(request.export_path, unique_id) + os.makedirs(unique_dir, exist_ok=True) + + # 如果是PDF,使用read_fn处理 + file_path = Path(request.source_path) + file_suffix = file_path.suffix.lower() + if file_suffix == ".pdf": + try: + pdf_bytes = read_fn(file_path) + pdf_name = file_path.stem + pdf_bytes_list = [pdf_bytes] + pdf_file_names = [pdf_name] + except Exception as e: + return JSONResponse( + status_code=400, + content={"error": f"Failed to load file: {str(e)}"} + ) + else: + return JSONResponse( + status_code=400, + content={"error": f"Unsupported file type: {file_suffix}"} + ) + + # 调用异步处理函数 + await aio_do_parse( + output_dir=unique_dir, + pdf_file_names=pdf_file_names, + pdf_bytes_list=pdf_bytes_list, + p_lang_list=["ch"], + f_draw_layout_bbox=False, + f_draw_span_bbox=False, + f_dump_orig_pdf=False, + ) + + if os.getenv("MINERU_BACKEND_MODE").startswith("pipeline"): + parse_dir = os.path.join(unique_dir, pdf_name, "auto") + else: + parse_dir = os.path.join(unique_dir, pdf_name, "vlm") + + content = "" + if os.path.exists(parse_dir): + content = get_infer_result(".md", pdf_name, parse_dir) + + if os.path.exists(unique_dir): + try: + shutil.rmtree(unique_dir) + except Exception as e: + logger.error(f"Failed to remove unique dir for {unique_id}: {str(e)}") + + logger.info(f"fileName: {file_path.name} costs {time.time() - start:.6f} s") + + return JSONResponse(status_code=200, content={"result": content}) + except Exception as e: + logger.exception(e) + return JSONResponse( + status_code=500, + content={"error": f"Failed to process file: {str(e)}"} + ) + + +@click.command() +@click.option('--ip', default='0.0.0.0', help='Service ip for this API, default to use 0.0.0.0.') +@click.option('--port', default=9001, type=int, help='Service port for this API, default to use 8082.') +def main(ip, port): + """Create API for Submitting Job to MinerU""" + logger.info(f"Start MinerU FastAPI Service: http://{ip}:{port}") + uvicorn.run( + app, + host=ip, + port=port + ) + + +if __name__ == "__main__": + main() + diff --git a/runtime/ops/formatter/__init__.py b/runtime/ops/formatter/__init__.py index bc02387..814d224 100644 --- a/runtime/ops/formatter/__init__.py +++ b/runtime/ops/formatter/__init__.py @@ -21,6 +21,7 @@ def _import_operators(): from . import file_exporter from . import slide_formatter from . import unstructured_formatter + from . import external_pdf_formatter _import_operators() diff --git a/runtime/ops/formatter/external_pdf_formatter/__init__.py b/runtime/ops/formatter/external_pdf_formatter/__init__.py new file mode 100644 index 0000000..dfbb651 --- /dev/null +++ b/runtime/ops/formatter/external_pdf_formatter/__init__.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module(module_name='ExternalPDFFormatter', + module_path="ops.formatter.external_pdf_formatter.process") diff --git a/runtime/ops/formatter/external_pdf_formatter/metadata.yml b/runtime/ops/formatter/external_pdf_formatter/metadata.yml new file mode 100644 index 0000000..85c8e97 --- /dev/null +++ b/runtime/ops/formatter/external_pdf_formatter/metadata.yml @@ -0,0 +1,16 @@ +name: '外部PDF文本抽取' +name_en: 'External PDF Text Extraction' +description: '基于外部API,抽取PDF中的文本。' +description_en: 'Extracts text from PDF files based on external APIs.' +language: 'python' +vendor: 'huawei' +raw_id: 'ExternalPDFFormatter' +version: '1.0.0' +types: + - 'collect' +modal: 'text' +effect: + before: '' + after: '' +inputs: 'text' +outputs: 'text' diff --git a/runtime/ops/formatter/external_pdf_formatter/process.py b/runtime/ops/formatter/external_pdf_formatter/process.py new file mode 100644 index 0000000..dcc810c --- /dev/null +++ b/runtime/ops/formatter/external_pdf_formatter/process.py @@ -0,0 +1,38 @@ +#!/user/bin/python +# -*- coding: utf-8 -*- + +""" +Description: 外部PDF文本抽取 +Create: 2025/10/29 17:24 +""" +import json +import os +import time +from loguru import logger +from typing import Dict, Any + +from datamate.core.base_op import Mapper +from datamate.common.utils.rest_client import http_request + + +class ExternalPDFFormatter(Mapper): + """基于外部API,抽取PDF中的文本""" + + def __init__(self, *args, **kwargs): + super(ExternalPDFFormatter, self).__init__(*args, **kwargs) + self.base_url = os.getenv("EXTERNAL_PDF_BASE_URL", "http://datamate-mineru:9001") + self.pdf_extract_url = f"{self.base_url}/api/pdf-extract" + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + filename = sample[self.filename_key] + try: + data = {"source_path": sample[self.filepath_key], "export_path": sample[self.export_path_key]} + response = http_request(method="POST", url=self.pdf_extract_url, data=data) + sample[self.text_key] = json.loads(response.text).get("result") + logger.info( + f"fileName: {filename}, method: ExternalPDFFormatter costs {(time.time() - start):6f} s") + except UnicodeDecodeError as err: + logger.exception(f"fileName: {filename}, method: ExternalPDFFormatter causes decode error: {err}") + raise + return sample diff --git a/runtime/python-executor/datamate/common/utils/rest_client.py b/runtime/python-executor/datamate/common/utils/rest_client.py new file mode 100644 index 0000000..f431561 --- /dev/null +++ b/runtime/python-executor/datamate/common/utils/rest_client.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +import requests +from typing import Optional, Dict, Any + +from starlette.responses import JSONResponse + + +def http_request(method: str, url: str, data: Optional[Dict[str, Any]] = None): + """ + 通用HTTP请求方法 + + Args: + method: 请求方法 ('GET', 'POST', 'PUT', 'DELETE', 'PATCH') + url: 请求URL + data: 请求数据(JSON格式) + + Returns: 格式化响应体 + + Raises: + requests.exceptions.RequestException: 请求异常 + ValueError: 响应状态码错误 + """ + success_codes = [200, 201, 202, 204] + + # 设置默认请求头 + headers = {"Content-Type": "application/json"} + + method = method.upper() + + try: + # 准备请求参数 + request_kwargs = {"url": url, "headers": headers} + + # 根据请求方法添加数据 + if data is not None: + if method in ["POST", "PUT", "DELETE"]: + request_kwargs["json"] = data + elif method == "GET": + request_kwargs["params"] = data + + # 发送请求 + response = requests.request(method, **request_kwargs) + + # 检查响应状态码 + if response.status_code not in success_codes: + raise ValueError( + f"Request failed! Status code: {response.status_code}, " + f"content: {response.text}, " + f"URL: {url}" + ) + return response + except requests.exceptions.RequestException as e: + raise requests.exceptions.RequestException(f"Request exception: {str(e)}") diff --git a/scripts/db/data-operator-init.sql b/scripts/db/data-operator-init.sql index 4a1b8a6..b611759 100644 --- a/scripts/db/data-operator-init.sql +++ b/scripts/db/data-operator-init.sql @@ -69,6 +69,7 @@ INSERT IGNORE INTO t_operator (id, name, description, version, inputs, outputs, runtime, settings, file_name, is_star) VALUES ('TextFormatter', 'TXT文本抽取', '抽取TXT中的文本。', '1.0.0', 'text', 'text', null, null, '', false), ('UnstructuredFormatter', '非结构化文本抽取', '抽取非结构化文件的文本,目前支持word文档。', '1.0.0', 'text', 'text', null, null, '', false), + ('ExternalPDFFormatter', '外部PDF文本抽取', '基于外部API,抽取PDF中的文本。', '1.0.0', 'text', 'text', null, null, '', false), ('FileExporter', '落盘算子', '将文件保存到本地目录。', '1.0.0', 'all', 'all', null, null, '', false), ('FileWithHighRepeatPhraseRateFilter', '文档词重复率检查', '去除重复词过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatPhraseRatio": {"name": "文档词重复率", "description": "某个词的统计数/文档总词数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}, "hitStopwords": {"name": "去除停用词", "description": "统计重复词时,选择是否要去除停用词。", "type": "switch", "defaultVal": false, "required": true, "checkedLabel": "去除", "unCheckedLabel": "不去除"}}', '', 'false'), ('FileWithHighRepeatWordRateFilter', '文档字重复率检查', '去除重复字过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatWordRatio": {"name": "文档字重复率", "description": "某个字的统计数/文档总字数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}}', '', 'false'), @@ -122,7 +123,7 @@ AND o.id IN ('TextFormatter', 'FileWithShortOrLongLengthFilter', 'FileWithHighRe 'AnonymizedIpAddress', 'AnonymizedPhoneNumber', 'AnonymizedUrlCleaner', 'HtmlTagCleaner', 'XMLTagCleaner', 'ContentCleaner', 'EmailNumberCleaner', 'EmojiCleaner', 'ExtraSpaceCleaner', 'FullWidthCharacterCleaner', 'GrableCharactersCleaner', 'InvisibleCharactersCleaner', 'LegendCleaner', 'PoliticalWordCleaner', - 'SexualAndViolentWordCleaner', 'TraditionalChineseCleaner', 'UnicodeSpaceCleaner', 'UnstructuredFormatter'); + 'SexualAndViolentWordCleaner', 'TraditionalChineseCleaner', 'UnicodeSpaceCleaner'); INSERT IGNORE INTO t_operator_category_relation(category_id, operator_id) SELECT c.id, o.id @@ -138,4 +139,4 @@ SELECT c.id, o.id FROM t_operator_category c CROSS JOIN t_operator o WHERE c.id IN (7, 8, 11) - AND o.id IN ('FileExporter', 'UnstructuredFormatter'); + AND o.id IN ('FileExporter', 'UnstructuredFormatter', 'ExternalPDFFormatter'); diff --git a/scripts/images/mineru/Dockerfile b/scripts/images/mineru/Dockerfile new file mode 100644 index 0000000..bb6166d --- /dev/null +++ b/scripts/images/mineru/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.11-slim + +COPY runtime/mineru /opt/runtime/datamate/mineru + +RUN sed -i 's/deb.debian.org/mirrors.huaweicloud.com/g' /etc/apt/sources.list.d/debian.sources \ + && apt-get update \ + && apt-get install -y curl vim libgl1 libglx0 libopengl0 libglib2.0-0 procps \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN pip config --user set global.index-url https://mirrors.huaweicloud.com/repository/pypi/simple && \ + pip config --user set global.trusted-host mirrors.huaweicloud.com && \ + pip install --upgrade setuptools && \ + pip install -U 'mineru[core]==2.5.4' --break-system-packages && \ + pip cache purge + +ENV CURL_CA_BUNDLE="" +ENV TORCH_DEVICE_BACKEND_AUTOLOAD=0 + +RUN /bin/bash -c "mineru-models-download -s modelscope -m all" + +ENV MINERU_MODEL_SOURCE=local