From d59c167da4718e4ec0163157d4434dd19f3057b9 Mon Sep 17 00:00:00 2001 From: hhhhsc701 <56435672+hhhhsc701@users.noreply.github.com> Date: Fri, 5 Dec 2025 17:26:29 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=97=E5=AD=90=E5=B0=86=E6=8A=BD=E5=8F=96?= =?UTF-8?q?=E4=B8=8E=E8=90=BD=E7=9B=98=E5=9B=BA=E5=AE=9A=E5=88=B0=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E4=B8=AD=20(#134)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature: 将抽取动作移到每一个算子中 * feature: 落盘算子改为默认执行 * feature: 优化前端展示 * feature: 使用pyproject管理依赖 --- .../components/OperatorOrchestration.tsx | 1 + .../Detail/components/FileTable.tsx | 10 +- .../process.py | 1 + .../process.py | 1 + .../process.py | 1 + .../process.py | 1 + .../img_blurred_images_cleaner/process.py | 1 + .../img_duplicated_images_cleaner/process.py | 1 + .../img_similar_images_cleaner/process.py | 1 + .../filter/remove_duplicate_file/process.py | 1 + .../process.py | 1 + .../process.py | 1 + runtime/ops/formatter/__init__.py | 5 - .../ops/formatter/file_exporter/__init__.py | 6 - .../ops/formatter/file_exporter/metadata.yml | 16 -- .../ops/formatter/file_exporter/process.py | 145 ------------- .../ops/formatter/img_formatter/__init__.py | 6 - .../ops/formatter/img_formatter/metadata.yml | 16 -- .../ops/formatter/img_formatter/process.py | 35 ---- .../ops/formatter/text_formatter/__init__.py | 6 - .../ops/formatter/text_formatter/metadata.yml | 16 -- .../ops/formatter/text_formatter/process.py | 44 ---- .../unstructured_formatter/__init__.py | 6 - .../unstructured_formatter/metadata.yml | 16 -- .../unstructured_formatter/process.py | 37 ---- .../ops/formatter/word_formatter/__init__.py | 6 - .../ops/formatter/word_formatter/metadata.yml | 16 -- .../ops/formatter/word_formatter/process.py | 68 ------- runtime/ops/mapper/content_cleaner/process.py | 1 + .../credit_card_number_cleaner/process.py | 1 + runtime/ops/mapper/email_cleaner/process.py | 1 + runtime/ops/mapper/emoji_cleaner/process.py | 1 + .../ops/mapper/extra_space_cleaner/process.py | 1 + .../full_width_characters_cleaner/process.py | 1 + .../garble_characters_cleaner/process.py | 1 + .../ops/mapper/html_tag_cleaner/process.py | 1 + .../ops/mapper/id_number_cleaner/process.py | 1 + runtime/ops/mapper/img_denoise/process.py | 1 + .../mapper/img_direction_correct/process.py | 1 + .../mapper/img_enhanced_brightness/process.py | 1 + .../mapper/img_enhanced_contrast/process.py | 1 + .../mapper/img_enhanced_saturation/process.py | 1 + .../mapper/img_enhanced_sharpness/process.py | 1 + .../img_perspective_transformation/process.py | 1 + runtime/ops/mapper/img_resize/process.py | 1 + .../ops/mapper/img_shadow_remove/process.py | 1 + runtime/ops/mapper/img_type_unify/process.py | 1 + .../mapper/img_watermark_remove/process.py | 1 + .../invisible_characters_cleaner/process.py | 1 + .../ops/mapper/ip_address_cleaner/process.py | 1 + .../knowledge_relation_slice/process.py | 1 + runtime/ops/mapper/legend_cleaner/process.py | 1 + .../mapper/phone_number_cleaner/process.py | 1 + .../mapper/political_word_cleaner/process.py | 1 + .../remove_duplicate_sentences/process.py | 5 +- .../process.py | 1 + runtime/ops/mapper/text_to_word/process.py | 1 + .../ops/mapper/traditional_chinese/process.py | 1 + .../mapper/unicode_space_cleaner/process.py | 1 + runtime/ops/mapper/url_cleaner/process.py | 1 + runtime/ops/mapper/xml_tag_cleaner/process.py | 1 + runtime/ops/pyproject.toml | 28 +++ runtime/ops/requirements.txt | 22 -- .../python-executor/datamate/core/base_op.py | 191 ++++++++++++++++-- .../python-executor/datamate/core/dataset.py | 11 +- .../sql_manager/persistence_atction.py | 13 +- runtime/python-executor/pyproject.toml | 28 +-- scripts/db/data-cleaning-init.sql | 14 +- scripts/db/data-operator-init.sql | 17 +- scripts/images/runtime/Dockerfile | 2 +- 70 files changed, 289 insertions(+), 539 deletions(-) delete mode 100644 runtime/ops/formatter/file_exporter/__init__.py delete mode 100644 runtime/ops/formatter/file_exporter/metadata.yml delete mode 100644 runtime/ops/formatter/file_exporter/process.py delete mode 100644 runtime/ops/formatter/img_formatter/__init__.py delete mode 100644 runtime/ops/formatter/img_formatter/metadata.yml delete mode 100644 runtime/ops/formatter/img_formatter/process.py delete mode 100644 runtime/ops/formatter/text_formatter/__init__.py delete mode 100644 runtime/ops/formatter/text_formatter/metadata.yml delete mode 100644 runtime/ops/formatter/text_formatter/process.py delete mode 100644 runtime/ops/formatter/unstructured_formatter/__init__.py delete mode 100644 runtime/ops/formatter/unstructured_formatter/metadata.yml delete mode 100644 runtime/ops/formatter/unstructured_formatter/process.py delete mode 100644 runtime/ops/formatter/word_formatter/__init__.py delete mode 100644 runtime/ops/formatter/word_formatter/metadata.yml delete mode 100644 runtime/ops/formatter/word_formatter/process.py create mode 100644 runtime/ops/pyproject.toml delete mode 100644 runtime/ops/requirements.txt diff --git a/frontend/src/pages/DataCleansing/Create/components/OperatorOrchestration.tsx b/frontend/src/pages/DataCleansing/Create/components/OperatorOrchestration.tsx index 088af35..75ec56e 100644 --- a/frontend/src/pages/DataCleansing/Create/components/OperatorOrchestration.tsx +++ b/frontend/src/pages/DataCleansing/Create/components/OperatorOrchestration.tsx @@ -150,6 +150,7 @@ const OperatorFlow: React.FC = ({ max={selectedOperators.length} defaultValue={index + 1} className="w-10 h-6 text-xs text-center" + style={{ width: 60 }} autoFocus onBlur={(e) => handleIndexChange(operator.id, e.target.value)} onKeyDown={(e) => { diff --git a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx index 1d95fe8..62102aa 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx @@ -227,9 +227,8 @@ export default function FileTable({result, fetchTaskResult}) { dataIndex: "status", key: "status", filters: [ - { text: "已完成", value: "已完成" }, - { text: "失败", value: "失败" }, - { text: "处理中", value: "处理中" }, + { text: "已完成", value: "COMPLETED" }, + { text: "失败", value: "FAILED" }, ], onFilter: (value: string, record: any) => record.status === value, render: (status: string) => ( @@ -237,9 +236,7 @@ export default function FileTable({result, fetchTaskResult}) { status={ status === "COMPLETED" ? "success" - : status === "FAILED" - ? "error" - : "processing" + : "error" } text={TaskStatusMap[status as TaskStatus].label} /> @@ -248,6 +245,7 @@ export default function FileTable({result, fetchTaskResult}) { { title: "操作", key: "action", + width: 200, render: (_text: string, record: any) => (
{record.status === "COMPLETED" ? ( diff --git a/runtime/ops/filter/file_with_high_repeat_phrase_rate_filter/process.py b/runtime/ops/filter/file_with_high_repeat_phrase_rate_filter/process.py index e931a2b..31ceb48 100644 --- a/runtime/ops/filter/file_with_high_repeat_phrase_rate_filter/process.py +++ b/runtime/ops/filter/file_with_high_repeat_phrase_rate_filter/process.py @@ -33,6 +33,7 @@ class FileWithHighRepeatPhraseRateFilter(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._file_with_high_repeat_phrase_rate_filter(sample[self.text_key], sample[self.filename_key]) logger.info(f"fileName: {sample[self.filename_key]}, " diff --git a/runtime/ops/filter/file_with_high_repeat_word_rate_filter/process.py b/runtime/ops/filter/file_with_high_repeat_word_rate_filter/process.py index 39289eb..54593d5 100644 --- a/runtime/ops/filter/file_with_high_repeat_word_rate_filter/process.py +++ b/runtime/ops/filter/file_with_high_repeat_word_rate_filter/process.py @@ -30,6 +30,7 @@ class FileWithHighRepeatWordRateFilter(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._file_with_high_repeat_word_rate_filter(sample[self.text_key], sample[self.filename_key]) logger.info(f"fileName: {sample[self.filename_key]}, " diff --git a/runtime/ops/filter/file_with_high_special_char_rate_filter/process.py b/runtime/ops/filter/file_with_high_special_char_rate_filter/process.py index 1d4c2ac..10a1896 100644 --- a/runtime/ops/filter/file_with_high_special_char_rate_filter/process.py +++ b/runtime/ops/filter/file_with_high_special_char_rate_filter/process.py @@ -26,6 +26,7 @@ class FileWithHighSpecialCharRateFilter(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._file_with_high_special_char_rate_filter(sample[self.text_key], sample[self.filename_key]) logger.info(f"fileName: {sample[self.filename_key]}, " diff --git a/runtime/ops/filter/img_advertisement_images_cleaner/process.py b/runtime/ops/filter/img_advertisement_images_cleaner/process.py index 38a3c6c..00005b1 100644 --- a/runtime/ops/filter/img_advertisement_images_cleaner/process.py +++ b/runtime/ops/filter/img_advertisement_images_cleaner/process.py @@ -105,6 +105,7 @@ class ImgAdvertisementImagesCleaner(Filter): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] img_bytes = sample[self.data_key] diff --git a/runtime/ops/filter/img_blurred_images_cleaner/process.py b/runtime/ops/filter/img_blurred_images_cleaner/process.py index 372ee66..b62ce08 100644 --- a/runtime/ops/filter/img_blurred_images_cleaner/process.py +++ b/runtime/ops/filter/img_blurred_images_cleaner/process.py @@ -27,6 +27,7 @@ class ImgBlurredImagesCleaner(Filter): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] diff --git a/runtime/ops/filter/img_duplicated_images_cleaner/process.py b/runtime/ops/filter/img_duplicated_images_cleaner/process.py index 578a145..440f0c7 100644 --- a/runtime/ops/filter/img_duplicated_images_cleaner/process.py +++ b/runtime/ops/filter/img_duplicated_images_cleaner/process.py @@ -61,6 +61,7 @@ class ImgDuplicatedImagesCleaner(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: """重复图片去重算子执行入口""" start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] self.task_uuid = sample.get("instance_id") if not self.task_uuid else self.task_uuid img_data = self._duplicate_images_filter(file_name, sample[self.data_key]) diff --git a/runtime/ops/filter/img_similar_images_cleaner/process.py b/runtime/ops/filter/img_similar_images_cleaner/process.py index b4f20ae..9121818 100644 --- a/runtime/ops/filter/img_similar_images_cleaner/process.py +++ b/runtime/ops/filter/img_similar_images_cleaner/process.py @@ -227,6 +227,7 @@ class ImgSimilarImagesCleaner(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: """去除相似图片算子执行入口""" start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] img_bytes = sample[self.data_key] data = bytes_to_numpy(img_bytes) if img_bytes else np.array([]) diff --git a/runtime/ops/filter/remove_duplicate_file/process.py b/runtime/ops/filter/remove_duplicate_file/process.py index d013022..626a7ec 100644 --- a/runtime/ops/filter/remove_duplicate_file/process.py +++ b/runtime/ops/filter/remove_duplicate_file/process.py @@ -150,6 +150,7 @@ class DuplicateFilesFilter(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] self.task_uuid = sample.get("instance_id") if not self.task_uuid else self.task_uuid sample[self.text_key] = self.deduplicate_files(sample, file_name) diff --git a/runtime/ops/filter/remove_file_with_many_sensitive_words/process.py b/runtime/ops/filter/remove_file_with_many_sensitive_words/process.py index 1e915c9..e99ffff 100644 --- a/runtime/ops/filter/remove_file_with_many_sensitive_words/process.py +++ b/runtime/ops/filter/remove_file_with_many_sensitive_words/process.py @@ -90,6 +90,7 @@ class FileWithManySensitiveWordsFilter(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._file_with_many_sensitive_words_filter(sample[self.text_key], sample[self.filename_key]) logger.info(f"fileName: {sample[self.filename_key]}, " diff --git a/runtime/ops/filter/remove_file_with_short_or_long_length/process.py b/runtime/ops/filter/remove_file_with_short_or_long_length/process.py index 34553ac..b8efbc1 100644 --- a/runtime/ops/filter/remove_file_with_short_or_long_length/process.py +++ b/runtime/ops/filter/remove_file_with_short_or_long_length/process.py @@ -31,6 +31,7 @@ class FileWithShortOrLongLengthFilter(Filter): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._file_with_short_or_long_length_filter(sample[self.text_key], sample[self.filename_key]) logger.info(f"fileName: {sample[self.filename_key]}, " diff --git a/runtime/ops/formatter/__init__.py b/runtime/ops/formatter/__init__.py index 301eab1..4cf89d5 100644 --- a/runtime/ops/formatter/__init__.py +++ b/runtime/ops/formatter/__init__.py @@ -15,12 +15,7 @@ _configure_importer() def _import_operators(): - from . import text_formatter - from . import word_formatter - from . import img_formatter - from . import file_exporter from . import slide_formatter - from . import unstructured_formatter from . import mineru_formatter diff --git a/runtime/ops/formatter/file_exporter/__init__.py b/runtime/ops/formatter/file_exporter/__init__.py deleted file mode 100644 index 90b375d..0000000 --- a/runtime/ops/formatter/file_exporter/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# -*- coding: utf-8 -*- - -from datamate.core.base_op import OPERATORS - -OPERATORS.register_module(module_name='FileExporter', - module_path="ops.formatter.file_exporter.process") diff --git a/runtime/ops/formatter/file_exporter/metadata.yml b/runtime/ops/formatter/file_exporter/metadata.yml deleted file mode 100644 index 47eab02..0000000 --- a/runtime/ops/formatter/file_exporter/metadata.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: '落盘算子' -name_en: 'save file operator' -description: '将文件内容保存为文件。' -description_en: 'Save the file data as a file.' -language: 'Python' -vendor: 'Huawei' -raw_id: 'FileExporter' -version: '1.0.0' -types: - - 'collect' -modal: 'others' -effect: - before: '' - after: '' -inputs: 'all' -outputs: 'all' diff --git a/runtime/ops/formatter/file_exporter/process.py b/runtime/ops/formatter/file_exporter/process.py deleted file mode 100644 index c4934c4..0000000 --- a/runtime/ops/formatter/file_exporter/process.py +++ /dev/null @@ -1,145 +0,0 @@ -#!/user/bin/python -# -*- coding: utf-8 -*- - -""" -Description: Json文本抽取 -Create: 2024/06/06 15:43 -""" -import time -import os -import uuid -from typing import Tuple, Dict, Any -from loguru import logger - -from datamate.core.constant import Fields -from datamate.core.base_op import Mapper -from datamate.common.utils import check_valid_path - - -class FileExporter(Mapper): - """把输入的json文件流抽取为txt""" - - def __init__(self, *args, **kwargs): - super(FileExporter, self).__init__(*args, **kwargs) - self.last_ops = True - self.text_support_ext = kwargs.get("text_support_ext", ['txt', 'html', 'md', 'markdown', - 'xlsx', 'xls', 'csv', 'pptx', 'ppt', - 'xml', 'json', 'doc', 'docx', 'pdf']) - self.data_support_ext = kwargs.get("data_support_ext", ['jpg', 'jpeg', 'png', 'bmp']) - self.medical_support_ext = kwargs.get("medical_support_ext", ['svs', 'tif', 'tiff']) - - def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: - file_name = sample[self.filename_key] - file_type = sample[self.filetype_key] - - try: - start = time.time() - if file_type in self.text_support_ext: - sample, save_path = self.get_textfile_handler(sample) - elif file_type in self.data_support_ext: - sample, save_path = self.get_datafile_handler(sample) - elif file_type in self.medical_support_ext: - sample, save_path = self.get_medicalfile_handler(sample) - else: - raise TypeError(f"{file_type} is unsupported! please check support_ext in FileExporter Ops") - - if sample[self.text_key] == '' and sample[self.data_key] == b'': - sample[self.filesize_key] = "0" - return sample - - if save_path: - self.save_file(sample, save_path) - sample[self.text_key] = '' - sample[self.data_key] = b'' - sample[Fields.result] = True - - file_type = save_path.split('.')[-1] - sample[self.filetype_key] = file_type - - base_name, _ = os.path.splitext(file_name) - new_file_name = base_name + '.' + file_type - sample[self.filename_key] = new_file_name - - base_name, _ = os.path.splitext(save_path) - sample[self.filepath_key] = base_name - file_size = os.path.getsize(base_name) - sample[self.filesize_key] = f"{file_size}" - - logger.info(f"origin file named {file_name} has been save to {save_path}") - logger.info(f"fileName: {sample[self.filename_key]}, " - f"method: FileExporter costs {time.time() - start:.6f} s") - except UnicodeDecodeError as err: - logger.error(f"fileName: {sample[self.filename_key]}, " - f"method: FileExporter causes decode error: {err}") - raise - return sample - - def get_save_path(self, sample: Dict[str, Any], target_type) -> str: - export_path = os.path.abspath(sample[self.export_path_key]) - file_name = sample[self.filename_key] - new_file_name = os.path.splitext(file_name)[0] + '.' + target_type - - if not check_valid_path(export_path): - os.makedirs(export_path, exist_ok=True) - res = os.path.join(export_path, new_file_name) - return res - - def get_textfile_handler(self, sample: Dict[str, Any]) -> Tuple[Dict, str]: - target_type = sample.get("target_type", None) - - # target_type存在则保存为扫描件, docx格式 - if target_type: - sample = self._get_from_data(sample) - save_path = self.get_save_path(sample, target_type) - # 不存在则保存为txt文件,正常文本清洗 - else: - sample = self._get_from_text(sample) - save_path = self.get_save_path(sample, 'txt') - return sample, save_path - - def get_datafile_handler(self, sample: Dict[str, Any]) -> Tuple[Dict, str]: - target_type = sample.get("target_type", None) - - # target_type存在, 图转文保存为target_type,markdown格式 - if target_type: - sample = self._get_from_text(sample) - save_path = self.get_save_path(sample, target_type) - # 不存在则保存为原本图片文件格式,正常图片清洗 - else: - sample = self._get_from_data(sample) - save_path = self.get_save_path(sample, sample[self.filetype_key]) - return sample, save_path - - def get_medicalfile_handler(self, sample: Dict[str, Any]) -> Tuple[Dict, str]: - target_type = 'png' - - sample = self._get_from_data(sample) - save_path = self.get_save_path(sample, target_type) - - return sample, save_path - - def save_file(self, sample, save_path): - file_name, _ = os.path.splitext(save_path) - # 以二进制格式保存文件 - file_sample = sample[self.text_key].encode('utf-8') if sample[self.text_key] else sample[self.data_key] - with open(file_name, 'wb') as f: - f.write(file_sample) - # 获取父目录路径 - - parent_dir = os.path.dirname(file_name) - os.chmod(parent_dir, 0o770) - os.chmod(file_name, 0o640) - - def _get_from_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: - sample[self.data_key] = bytes(sample[self.data_key]) - sample[self.text_key] = '' - return sample - - def _get_from_text(self, sample: Dict[str, Any]) -> Dict[str, Any]: - sample[self.data_key] = b'' - sample[self.text_key] = str(sample[self.text_key]) - return sample - - def _get_uuid(self): - res = str(uuid.uuid4()) - return res diff --git a/runtime/ops/formatter/img_formatter/__init__.py b/runtime/ops/formatter/img_formatter/__init__.py deleted file mode 100644 index 6b01a93..0000000 --- a/runtime/ops/formatter/img_formatter/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# -*- coding: utf-8 -*- - -from datamate.core.base_op import OPERATORS - -OPERATORS.register_module(module_name='ImgFormatter', - module_path="ops.formatter.img_formatter.process") diff --git a/runtime/ops/formatter/img_formatter/metadata.yml b/runtime/ops/formatter/img_formatter/metadata.yml deleted file mode 100644 index e7aa74a..0000000 --- a/runtime/ops/formatter/img_formatter/metadata.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: '读取图片文件' -name_en: 'Image File Reader' -description: '读取图片文件。' -description_en: 'Reads image files.' -language: 'Python' -vendor: 'Huawei' -raw_id: 'ImgFormatter' -version: '1.0.0' -types: - - 'collect' -modal: 'image' -effect: - before: '' - after: '' -inputs: 'image' -outputs: 'image' diff --git a/runtime/ops/formatter/img_formatter/process.py b/runtime/ops/formatter/img_formatter/process.py deleted file mode 100644 index bdc1c4c..0000000 --- a/runtime/ops/formatter/img_formatter/process.py +++ /dev/null @@ -1,35 +0,0 @@ -# # -- encoding: utf-8 -- - -# -# Description: -# Create: 2024/1/30 15:24 -# """ -import time -from typing import Dict, Any - -import cv2 -import numpy as np -from loguru import logger - -from datamate.common.utils import numpy_to_bytes -from datamate.core.base_op import Mapper - - -class ImgFormatter(Mapper): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: - start = time.time() - file_name = sample[self.filename_key] - file_type = "." + sample[self.filetype_key] - file_path = sample[self.filepath_key] - img_data = _img_extract(file_path) - sample[self.data_key] = numpy_to_bytes(img_data, file_type) - logger.info(f"fileName: {file_name}, method: ImgExtract costs {(time.time() - start):6f} s") - return sample - - -def _img_extract(file_path): - return cv2.imdecode(np.fromfile(file_path, dtype=np.uint8), -1) diff --git a/runtime/ops/formatter/text_formatter/__init__.py b/runtime/ops/formatter/text_formatter/__init__.py deleted file mode 100644 index 80b908d..0000000 --- a/runtime/ops/formatter/text_formatter/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# -*- coding: utf-8 -*- - -from datamate.core.base_op import OPERATORS - -OPERATORS.register_module(module_name='TextFormatter', - module_path="ops.formatter.text_formatter.process") diff --git a/runtime/ops/formatter/text_formatter/metadata.yml b/runtime/ops/formatter/text_formatter/metadata.yml deleted file mode 100644 index 08adcae..0000000 --- a/runtime/ops/formatter/text_formatter/metadata.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: 'TXT文本抽取' -name_en: 'TXT Text Extraction' -description: '抽取TXT中的文本' -description_en: 'Extracts text from TXT files.' -language: 'python' -vendor: 'huawei' -raw_id: 'TxtFormatter' -version: '1.0.0' -types: - - 'collect' -modal: 'text' -effect: - before: '' - after: '' -inputs: 'text' -outputs: 'text' diff --git a/runtime/ops/formatter/text_formatter/process.py b/runtime/ops/formatter/text_formatter/process.py deleted file mode 100644 index d290c95..0000000 --- a/runtime/ops/formatter/text_formatter/process.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/user/bin/python -# -*- coding: utf-8 -*- - -""" -Description: Json文本抽取 -Create: 2024/06/06 15:43 -""" -import time -from loguru import logger -from typing import Dict, Any - -from datamate.core.base_op import Mapper - - -class TextFormatter(Mapper): - """把输入的json文件流抽取为txt""" - - def __init__(self, *args, **kwargs): - super(TextFormatter, self).__init__(*args, **kwargs) - - @staticmethod - def _extract_json(byte_io): - """将默认使用utf-8编码的Json文件流解码,抽取为txt""" - # 用utf-8-sig的格式进行抽取,可以避免uft-8 BOM编码格式的文件在抽取后产生隐藏字符作为前缀。 - return byte_io.decode("utf-8-sig").replace("\r\n", "\n") - - def byte_read(self, sample: Dict[str, Any]): - filepath = sample[self.filepath_key] - with open(filepath, "rb") as file: - byte_data = file.read() - sample[self.data_key] = byte_data - - def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: - start = time.time() - try: - self.byte_read(sample) - sample[self.text_key] = self._extract_json(sample[self.data_key]) - sample[self.data_key] = b"" # 将sample[self.data_key]置空 - logger.info( - f"fileName: {sample[self.filename_key]}, method: TextFormatter costs {(time.time() - start):6f} s") - except UnicodeDecodeError as err: - logger.exception(f"fileName: {sample[self.filename_key]}, method: TextFormatter causes decode error: {err}") - raise - return sample diff --git a/runtime/ops/formatter/unstructured_formatter/__init__.py b/runtime/ops/formatter/unstructured_formatter/__init__.py deleted file mode 100644 index ab5ad41..0000000 --- a/runtime/ops/formatter/unstructured_formatter/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# -*- coding: utf-8 -*- - -from datamate.core.base_op import OPERATORS - -OPERATORS.register_module(module_name='UnstructuredFormatter', - module_path="ops.formatter.unstructured_formatter.process") diff --git a/runtime/ops/formatter/unstructured_formatter/metadata.yml b/runtime/ops/formatter/unstructured_formatter/metadata.yml deleted file mode 100644 index be68175..0000000 --- a/runtime/ops/formatter/unstructured_formatter/metadata.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: 'Unstructured文本抽取' -name_en: 'Unstructured Text Extraction' -description: '抽取非结构化文件的文本,目前支持PowerPoint演示文稿、Word文档以及Excel工作簿。' -description_en: 'Extracts text from Unstructured files, currently supporting PowerPoint presentations, Word documents and Excel spreadsheets files.' -language: 'python' -vendor: 'huawei' -raw_id: 'UnstructuredFormatter' -version: '1.0.0' -types: - - 'collect' -modal: 'text' -effect: - before: '' - after: '' -inputs: 'text' -outputs: 'text' diff --git a/runtime/ops/formatter/unstructured_formatter/process.py b/runtime/ops/formatter/unstructured_formatter/process.py deleted file mode 100644 index db173a8..0000000 --- a/runtime/ops/formatter/unstructured_formatter/process.py +++ /dev/null @@ -1,37 +0,0 @@ - -#!/user/bin/python -# -*- coding: utf-8 -*- - -""" -Description: 非结构化文本抽取 -Create: 2025/10/22 15:15 -""" -import time -from typing import Dict, Any - -from loguru import logger -from unstructured.partition.auto import partition - -from datamate.core.base_op import Mapper - - -class UnstructuredFormatter(Mapper): - """把输入的非结构化文本抽取为txt""" - - def __init__(self, *args, **kwargs): - super(UnstructuredFormatter, self).__init__(*args, **kwargs) - - def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: - start = time.time() - filepath = sample.get(self.filepath_key) - filename = sample.get(self.filename_key) - if not filename.lower().endswith((".ppt", ".pptx", "docx", "xlsx", ".csv")): - return sample - try: - elements = partition(filename=filepath) - sample[self.text_key] = "\n\n".join([str(el) for el in elements]) - logger.info(f"fileName: {filename}, method: UnstructuredFormatter costs {(time.time() - start):6f} s") - except UnicodeDecodeError as err: - logger.exception(f"fileName: {filename}, method: UnstructuredFormatter causes decode error: {err}") - raise - return sample diff --git a/runtime/ops/formatter/word_formatter/__init__.py b/runtime/ops/formatter/word_formatter/__init__.py deleted file mode 100644 index 2e1edc1..0000000 --- a/runtime/ops/formatter/word_formatter/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# -*- coding: utf-8 -*- - -from datamate.core.base_op import OPERATORS - -OPERATORS.register_module(module_name='WordFormatter', - module_path="ops.formatter.word_formatter.process") diff --git a/runtime/ops/formatter/word_formatter/metadata.yml b/runtime/ops/formatter/word_formatter/metadata.yml deleted file mode 100644 index 27023dd..0000000 --- a/runtime/ops/formatter/word_formatter/metadata.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: 'Word文本抽取' -name_en: 'Word Text Extraction' -description: '抽取Word中的文本' -description_en: 'Extracts text from Word files.' -language: 'java' -vendor: 'huawei' -raw_id: 'WordFormatter' -version: '1.0.0' -types: - - 'collect' -modal: 'text' -effect: - before: '' - after: '' -inputs: 'text' -outputs: 'text' diff --git a/runtime/ops/formatter/word_formatter/process.py b/runtime/ops/formatter/word_formatter/process.py deleted file mode 100644 index 825baca..0000000 --- a/runtime/ops/formatter/word_formatter/process.py +++ /dev/null @@ -1,68 +0,0 @@ -# # -- encoding: utf-8 -- - -# -# Description: -# Create: 2024/1/30 15:24 -# """ -from loguru import logger -import os -import subprocess -import time -from typing import Dict, Any - -from datamate.common.utils import check_valid_path -from datamate.core.base_op import Mapper - - -class WordFormatter(Mapper): - SEPERATOR = ' | ' - - def __init__(self, *args, **kwargs): - super(WordFormatter, self).__init__(*args, **kwargs) - - def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: - start = time.time() - file_name = sample[self.filename_key] - file_path = sample[self.filepath_key] - file_type = sample[self.filetype_key] - txt_content = self.word2html(file_path, file_type) - sample[self.text_key] = txt_content - logger.info(f"fileName: {file_name}, method: WordFormatter costs {(time.time() - start):6f} s") - return sample - - @staticmethod - def word2html(file_path, file_type): - check_valid_path(file_path) - file_dir = file_path.rsplit('/', 1)[0] - file_name = file_path.rsplit('/', 1)[1] - html_file_path = os.path.join(file_dir, f"{file_name}.txt") - - current_file_path = os.path.dirname(os.path.abspath(__file__)) - try: - process = subprocess.Popen( - ['java', '-jar', f'{current_file_path}/../../../java_operator/WordFormatter-1.0.jar', file_path, - html_file_path, file_type], shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - stdout, stderr = process.communicate(timeout=24 * 60 * 60) - if process.returncode == 0: - logger.info(f"Convert {file_path} successfully to DOCX") - else: - logger.info(f"Convert {file_path} failed, error: {stderr.strip().decode('utf-8')}.") - raise RuntimeError() - except subprocess.CalledProcessError as e: - logger.error(f"Convert failed: {e}, return code: {e.returncode}") - except FileNotFoundError: - logger.error("LibreOffice command not found, please make sure it is available in PATH") - except Exception as e: - logger.error(f"An unexpected error occurred, convert failed: {e}", ) - - try: - with open(html_file_path, 'r', encoding='utf-8') as file: - txt_content = file.read() - os.remove(html_file_path) - logger.info("Tmp docx file removed") - except FileNotFoundError: - logger.error(f"Tmp file {html_file_path} does not exist") - except PermissionError: - logger.error(f"You are not allowed to delete tmp file {html_file_path}") - logger.info(f"Convert {html_file_path} to html success") - return txt_content diff --git a/runtime/ops/mapper/content_cleaner/process.py b/runtime/ops/mapper/content_cleaner/process.py index ab06e12..7dd45a9 100644 --- a/runtime/ops/mapper/content_cleaner/process.py +++ b/runtime/ops/mapper/content_cleaner/process.py @@ -30,6 +30,7 @@ class ContentCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._content_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: ContentCleaner costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/credit_card_number_cleaner/process.py b/runtime/ops/mapper/credit_card_number_cleaner/process.py index 993c099..199d433 100644 --- a/runtime/ops/mapper/credit_card_number_cleaner/process.py +++ b/runtime/ops/mapper/credit_card_number_cleaner/process.py @@ -64,6 +64,7 @@ class AnonymizedCreditCardNumber(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._credit_card_number_filter(sample[self.text_key]) logger.info( f"fileName: {sample[self.filename_key]}, method: CreditCardNumberCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/email_cleaner/process.py b/runtime/ops/mapper/email_cleaner/process.py index 566a653..0113589 100644 --- a/runtime/ops/mapper/email_cleaner/process.py +++ b/runtime/ops/mapper/email_cleaner/process.py @@ -25,6 +25,7 @@ class EmailNumberCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._email_number_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: EmailCleaner costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/emoji_cleaner/process.py b/runtime/ops/mapper/emoji_cleaner/process.py index 4022732..3ec61a1 100644 --- a/runtime/ops/mapper/emoji_cleaner/process.py +++ b/runtime/ops/mapper/emoji_cleaner/process.py @@ -22,6 +22,7 @@ class EmojiCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._emoji_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: EmojiCleaner costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/extra_space_cleaner/process.py b/runtime/ops/mapper/extra_space_cleaner/process.py index 83a4bad..ce8509f 100644 --- a/runtime/ops/mapper/extra_space_cleaner/process.py +++ b/runtime/ops/mapper/extra_space_cleaner/process.py @@ -41,6 +41,7 @@ class ExtraSpaceCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._clean_extra_space(sample[self.text_key]) logger.info( f"fileName: {sample[self.filename_key]}, method: ExtraSpaceCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/full_width_characters_cleaner/process.py b/runtime/ops/mapper/full_width_characters_cleaner/process.py index a7bea13..9637de4 100644 --- a/runtime/ops/mapper/full_width_characters_cleaner/process.py +++ b/runtime/ops/mapper/full_width_characters_cleaner/process.py @@ -34,6 +34,7 @@ class FullWidthCharacterCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._full_width_character_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, " f"method: FullWidthCharactersCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/garble_characters_cleaner/process.py b/runtime/ops/mapper/garble_characters_cleaner/process.py index dfa1958..423df7f 100644 --- a/runtime/ops/mapper/garble_characters_cleaner/process.py +++ b/runtime/ops/mapper/garble_characters_cleaner/process.py @@ -44,6 +44,7 @@ class GrableCharactersCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._grable_characters_filter(sample[self.text_key]) logger.info( f"fileName: {sample[self.filename_key]}, method: GrableCharactersCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/html_tag_cleaner/process.py b/runtime/ops/mapper/html_tag_cleaner/process.py index 30b51ae..257d330 100644 --- a/runtime/ops/mapper/html_tag_cleaner/process.py +++ b/runtime/ops/mapper/html_tag_cleaner/process.py @@ -64,6 +64,7 @@ class HtmlTagCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) if sample[self.filetype_key] != "xml": sample[self.text_key] = self._remove_html_tags(sample[self.text_key]) logger.info( diff --git a/runtime/ops/mapper/id_number_cleaner/process.py b/runtime/ops/mapper/id_number_cleaner/process.py index 838fdfd..943e1a9 100644 --- a/runtime/ops/mapper/id_number_cleaner/process.py +++ b/runtime/ops/mapper/id_number_cleaner/process.py @@ -71,6 +71,7 @@ class AnonymizedIdNumber(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._id_number_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: IDNumberCleaner costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/img_denoise/process.py b/runtime/ops/mapper/img_denoise/process.py index a8cdceb..fe9987b 100644 --- a/runtime/ops/mapper/img_denoise/process.py +++ b/runtime/ops/mapper/img_denoise/process.py @@ -28,6 +28,7 @@ class ImgDenoise(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] diff --git a/runtime/ops/mapper/img_direction_correct/process.py b/runtime/ops/mapper/img_direction_correct/process.py index a47fa9a..a50352c 100644 --- a/runtime/ops/mapper/img_direction_correct/process.py +++ b/runtime/ops/mapper/img_direction_correct/process.py @@ -97,6 +97,7 @@ class ImgDirectionCorrect(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] img_bytes = sample[self.data_key] diff --git a/runtime/ops/mapper/img_enhanced_brightness/process.py b/runtime/ops/mapper/img_enhanced_brightness/process.py index 43fe423..553e176 100644 --- a/runtime/ops/mapper/img_enhanced_brightness/process.py +++ b/runtime/ops/mapper/img_enhanced_brightness/process.py @@ -88,6 +88,7 @@ class ImgBrightness(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] diff --git a/runtime/ops/mapper/img_enhanced_contrast/process.py b/runtime/ops/mapper/img_enhanced_contrast/process.py index d12612c..5b91184 100644 --- a/runtime/ops/mapper/img_enhanced_contrast/process.py +++ b/runtime/ops/mapper/img_enhanced_contrast/process.py @@ -59,6 +59,7 @@ class ImgContrast(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] diff --git a/runtime/ops/mapper/img_enhanced_saturation/process.py b/runtime/ops/mapper/img_enhanced_saturation/process.py index 893414d..99b5483 100644 --- a/runtime/ops/mapper/img_enhanced_saturation/process.py +++ b/runtime/ops/mapper/img_enhanced_saturation/process.py @@ -69,6 +69,7 @@ class ImgSaturation(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] diff --git a/runtime/ops/mapper/img_enhanced_sharpness/process.py b/runtime/ops/mapper/img_enhanced_sharpness/process.py index 991d46d..b1e101c 100644 --- a/runtime/ops/mapper/img_enhanced_sharpness/process.py +++ b/runtime/ops/mapper/img_enhanced_sharpness/process.py @@ -57,6 +57,7 @@ class ImgSharpness(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] diff --git a/runtime/ops/mapper/img_perspective_transformation/process.py b/runtime/ops/mapper/img_perspective_transformation/process.py index e331cb7..b84aeee 100644 --- a/runtime/ops/mapper/img_perspective_transformation/process.py +++ b/runtime/ops/mapper/img_perspective_transformation/process.py @@ -25,6 +25,7 @@ class ImgPerspectiveTransformation(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] diff --git a/runtime/ops/mapper/img_resize/process.py b/runtime/ops/mapper/img_resize/process.py index 08c0ae2..248237b 100644 --- a/runtime/ops/mapper/img_resize/process.py +++ b/runtime/ops/mapper/img_resize/process.py @@ -29,6 +29,7 @@ class ImgResize(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] img_bytes = sample[self.data_key] diff --git a/runtime/ops/mapper/img_shadow_remove/process.py b/runtime/ops/mapper/img_shadow_remove/process.py index 98c089f..7be557d 100644 --- a/runtime/ops/mapper/img_shadow_remove/process.py +++ b/runtime/ops/mapper/img_shadow_remove/process.py @@ -60,6 +60,7 @@ class ImgShadowRemove(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) img_bytes = sample[self.data_key] file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] diff --git a/runtime/ops/mapper/img_type_unify/process.py b/runtime/ops/mapper/img_type_unify/process.py index c0103c5..efa5ba4 100644 --- a/runtime/ops/mapper/img_type_unify/process.py +++ b/runtime/ops/mapper/img_type_unify/process.py @@ -21,6 +21,7 @@ class ImgTypeUnify(Mapper): def execute(self, sample): start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] origin_file_type = sample[self.filetype_key] if origin_file_type == self._setting_type: diff --git a/runtime/ops/mapper/img_watermark_remove/process.py b/runtime/ops/mapper/img_watermark_remove/process.py index 1185712..86ea557 100644 --- a/runtime/ops/mapper/img_watermark_remove/process.py +++ b/runtime/ops/mapper/img_watermark_remove/process.py @@ -80,6 +80,7 @@ class ImgWatermarkRemove(Mapper): def execute(self, sample: Dict[str, Any]): start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] file_type = "." + sample[self.filetype_key] img_bytes = sample[self.data_key] diff --git a/runtime/ops/mapper/invisible_characters_cleaner/process.py b/runtime/ops/mapper/invisible_characters_cleaner/process.py index d2e1fbf..7532504 100644 --- a/runtime/ops/mapper/invisible_characters_cleaner/process.py +++ b/runtime/ops/mapper/invisible_characters_cleaner/process.py @@ -24,6 +24,7 @@ class InvisibleCharactersCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._invisible_characters_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, " f"method: InvisibleCharactersCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/ip_address_cleaner/process.py b/runtime/ops/mapper/ip_address_cleaner/process.py index 190eafc..d442f4d 100644 --- a/runtime/ops/mapper/ip_address_cleaner/process.py +++ b/runtime/ops/mapper/ip_address_cleaner/process.py @@ -37,6 +37,7 @@ class AnonymizedIpAddress(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._ip_address_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: IPAddressCleaner costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/knowledge_relation_slice/process.py b/runtime/ops/mapper/knowledge_relation_slice/process.py index b7c0c6e..9ba64dd 100644 --- a/runtime/ops/mapper/knowledge_relation_slice/process.py +++ b/runtime/ops/mapper/knowledge_relation_slice/process.py @@ -35,6 +35,7 @@ class KnowledgeRelationSlice(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start_time = time.time() + self.read_file_first(sample) chunk_item = get_json_list(sample[self.text_key], chunk_size=self.chunk_size, overlap_size=self.overlap_size) chunk_item_json = json.dumps(chunk_item, ensure_ascii=False) diff --git a/runtime/ops/mapper/legend_cleaner/process.py b/runtime/ops/mapper/legend_cleaner/process.py index 837cc21..e503ae7 100644 --- a/runtime/ops/mapper/legend_cleaner/process.py +++ b/runtime/ops/mapper/legend_cleaner/process.py @@ -36,6 +36,7 @@ class LegendCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._clean_html_tag(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: LegendCleaner costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/phone_number_cleaner/process.py b/runtime/ops/mapper/phone_number_cleaner/process.py index e68e6c0..9491365 100644 --- a/runtime/ops/mapper/phone_number_cleaner/process.py +++ b/runtime/ops/mapper/phone_number_cleaner/process.py @@ -37,6 +37,7 @@ class AnonymizedPhoneNumber(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._phone_number_filter(sample[self.text_key]) logger.info( f"fileName: {sample[self.filename_key]}, method: PhoneNumberCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/political_word_cleaner/process.py b/runtime/ops/mapper/political_word_cleaner/process.py index 89456d6..62c5be3 100644 --- a/runtime/ops/mapper/political_word_cleaner/process.py +++ b/runtime/ops/mapper/political_word_cleaner/process.py @@ -53,6 +53,7 @@ class PoliticalWordCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._political_word_filter(sample[self.text_key]) logger.info( f"fileName: {sample[self.filename_key]}, method: PoliticalWordCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/remove_duplicate_sentences/process.py b/runtime/ops/mapper/remove_duplicate_sentences/process.py index c7be2be..05f1a09 100644 --- a/runtime/ops/mapper/remove_duplicate_sentences/process.py +++ b/runtime/ops/mapper/remove_duplicate_sentences/process.py @@ -47,8 +47,8 @@ def duplicate_sentences_filter(input_data: str, file_name: str, duplicate_th: in paragraph_counts[paragraph_strip] = -1 except Exception as err: - logger.exception(f"fileName: {file_name}, method: RemoveDuplicateSentencess. An error occurred when using " - f"filtering duplicate sentences. The error is: {err}") + logger.exception(f"fileName: {file_name}, method: RemoveDuplicateSentencess. An error occurred when using " + f"filtering duplicate sentences. The error is: {err}") return input_data # 将去重后的段落重新组合成文本 @@ -63,6 +63,7 @@ class DuplicateSentencesFilter(Filter): duplicate_th = 5 # 段落重复次数阈值 file_name = sample[self.filename_key] start = time.time() + self.read_file_first(sample) sample[self.text_key] = duplicate_sentences_filter(sample[self.text_key], file_name, duplicate_th) logger.info(f"fileName: {file_name}, RemoveDuplicateSentencess costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/sexual_and_violent_word_cleaner/process.py b/runtime/ops/mapper/sexual_and_violent_word_cleaner/process.py index b0833de..7f598a2 100644 --- a/runtime/ops/mapper/sexual_and_violent_word_cleaner/process.py +++ b/runtime/ops/mapper/sexual_and_violent_word_cleaner/process.py @@ -56,6 +56,7 @@ class SexualAndViolentWordCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._sexual_and_violent_word_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, " f"method: SexualAndViolentWordCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/text_to_word/process.py b/runtime/ops/mapper/text_to_word/process.py index a2001e8..e31c0f0 100644 --- a/runtime/ops/mapper/text_to_word/process.py +++ b/runtime/ops/mapper/text_to_word/process.py @@ -61,6 +61,7 @@ class TextToWord(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: """将文本信息转换为docx文件流""" start = time.time() + self.read_file_first(sample) sample[self.data_key] = self._txt_to_docx(sample[self.text_key]) # 将文字转换为word字符串流 sample[self.text_key] = "" sample["target_type"] = "docx" diff --git a/runtime/ops/mapper/traditional_chinese/process.py b/runtime/ops/mapper/traditional_chinese/process.py index df17d1b..38fdc86 100644 --- a/runtime/ops/mapper/traditional_chinese/process.py +++ b/runtime/ops/mapper/traditional_chinese/process.py @@ -27,6 +27,7 @@ class TraditionalChineseCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._traditional_chinese_filter(sample[self.text_key]) logger.info( f"fileName: {sample[self.filename_key]}, method: TraditionalChinese costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/unicode_space_cleaner/process.py b/runtime/ops/mapper/unicode_space_cleaner/process.py index 36b9495..ae0a249 100644 --- a/runtime/ops/mapper/unicode_space_cleaner/process.py +++ b/runtime/ops/mapper/unicode_space_cleaner/process.py @@ -23,6 +23,7 @@ class UnicodeSpaceCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._clean_unicode_space(sample[self.text_key]) logger.info( f"fileName: {sample[self.filename_key]}, method: UnicodeSpaceCleaner costs {time.time() - start:6f} s") diff --git a/runtime/ops/mapper/url_cleaner/process.py b/runtime/ops/mapper/url_cleaner/process.py index c26d682..b47c2cd 100644 --- a/runtime/ops/mapper/url_cleaner/process.py +++ b/runtime/ops/mapper/url_cleaner/process.py @@ -26,6 +26,7 @@ class AnonymizedUrlCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) sample[self.text_key] = self._url_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: UrlCleaner costs {time.time() - start:6f} s") return sample diff --git a/runtime/ops/mapper/xml_tag_cleaner/process.py b/runtime/ops/mapper/xml_tag_cleaner/process.py index d5fe4a5..a13f5e6 100644 --- a/runtime/ops/mapper/xml_tag_cleaner/process.py +++ b/runtime/ops/mapper/xml_tag_cleaner/process.py @@ -52,6 +52,7 @@ class XMLTagCleaner(Mapper): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() + self.read_file_first(sample) file_name = sample[self.filename_key] if sample[self.filetype_key] == "xml": try: diff --git a/runtime/ops/pyproject.toml b/runtime/ops/pyproject.toml new file mode 100644 index 0000000..b5b0eb9 --- /dev/null +++ b/runtime/ops/pyproject.toml @@ -0,0 +1,28 @@ +[project] +name = "ops" +version = "0.0.1" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "beautifulsoup4>=4.14.3", + "datasketch>=1.8.0", + "email-validator>=2.3.0", + "emoji>=2.15.0", + "jieba>=0.42.1", + "loguru>=0.7.3", + "numpy>=2.2.0,<=2.2.6", + "opencv-contrib-python-headless>=4.12.0.88", + "opencv-python-headless>=4.12.0.88", + "openslide-python>=1.4.3", + "paddleocr>=3.3.2", + "pandas>=2.2.0,<=2.2.3", + "pycryptodome>=3.23.0", + "pymysql>=1.1.2", + "python-docx>=1.2.0", + "pytz>=2025.2", + "six>=1.17.0", + "sqlalchemy>=2.0.44", + "xmltodict>=1.0.2", + "zhconv>=1.4.3", +] \ No newline at end of file diff --git a/runtime/ops/requirements.txt b/runtime/ops/requirements.txt deleted file mode 100644 index b214fb7..0000000 --- a/runtime/ops/requirements.txt +++ /dev/null @@ -1,22 +0,0 @@ -beautifulsoup4==4.14.2 -datamate==0.0.1 -datasketch==1.6.5 -email_validator==2.3.0 -emoji==2.2.0 -jieba==0.42.1 -loguru==0.7.3 -numpy==2.2.6 -opencv_contrib_python-headless==4.10.0.84 -opencv_python-headless==4.12.0.88 -openslide_python==1.4.2 -paddleocr==3.2.0 -pandas==2.2.3 -pycryptodome==3.23.0 -python_docx==1.2.0 -pytz==2025.2 -six==1.17.0 -xmltodict==1.0.2 -zhconv==1.4.3 -sqlalchemy==2.0.40 -pymysql==1.1.1 -unstructured[docx,csv,xlsx,pptx]==0.18.15 \ No newline at end of file diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index 1db019a..8879ab9 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -2,10 +2,15 @@ import json import os +import time import traceback +import uuid from typing import List, Dict, Any, Tuple +import cv2 +import numpy as np from loguru import logger +from unstructured.partition.auto import partition from datamate.common.error_code import ERROR_CODE_TABLE, UNKNOWN_ERROR_CODE from datamate.common.utils.llm_request import LlmReq @@ -52,6 +57,7 @@ class BaseOp: def __init__(self, *args, **kwargs): self.accelerator = kwargs.get('accelerator', "cpu") self.is_last_op = kwargs.get('is_last_op', False) + self.is_first_op = kwargs.get('is_first_op', False) self._name = kwargs.get('op_name', None) self.infer_model = None self.text_key = kwargs.get('text_key', "text") @@ -122,10 +128,10 @@ class BaseOp: raise NotImplementedError("This is in BaseOp, plese re-define this method in Sub-classes") def fill_sample_params(self, sample: Dict[str, Any], **kwargs): - if not sample.get("text", None): + if not sample.get(self.text_key, None): sample[self.text_key] = "" - if not sample.get("data", None): + if not sample.get(self.data_key, None): sample[self.data_key] = b"" if not sample[self.data_key] and not sample[self.text_key]: @@ -137,6 +143,27 @@ class BaseOp: failed_reason = {"op_name": op_name, "error_code": error_code, "reason": exc_info} sample["failed_reason"] = failed_reason + def read_file(self, sample): + filepath = sample[self.filepath_key] + filetype = sample[self.filetype_key] + if filetype in ["ppt", "pptx", "docx", "doc", "xlsx"]: + elements = partition(filename=filepath) + sample[self.text_key] = "\n\n".join([str(el) for el in elements]) + elif filetype in ["txt", "md", "markdown", "xml", "html", "csv", "json", "jsonl"]: + with open(filepath, 'rb') as f: + content = f.read() + sample[self.text_key] = content.decode("utf-8-sig").replace("\r\n", "\n") + elif filetype in ['jpg', 'jpeg', 'png', 'bmp']: + image_np = cv2.imdecode(np.fromfile(filepath, dtype=np.uint8), -1) + if image_np.size: + data = cv2.imencode(f".{filetype}", image_np)[1] + image_bytes = data.tobytes() + sample[self.data_key] = image_bytes + + def read_file_first(self, sample): + if self.is_first_op: + self.read_file(sample) + class Mapper(BaseOp): def __init__(self, *args, **kwargs): @@ -158,15 +185,16 @@ class Mapper(BaseOp): logger.error(f"Ops named {self.name} map failed, Error Info: \n" f"{str(get_exception_info(e))}") sample["execute_status"] = execute_status - task_info = TaskInfoPersistence() - task_info.persistence_task_info(sample) + sample[self.filesize_key] = "0" + sample[self.filetype_key] = "" + TaskInfoPersistence().update_task_result(sample) raise e sample["execute_status"] = execute_status # 加载文件成功执行信息到数据库 if self.is_last_op: - task_info = TaskInfoPersistence() - task_info.persistence_task_info(sample) + if FileExporter().execute(sample): + TaskInfoPersistence().persistence_task_info(sample) return sample def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: @@ -197,8 +225,9 @@ class Slicer(BaseOp): logger.error(f"Ops named {self.name} map failed, Error Info: \n" f"{str(get_exception_info(e))}") sample["execute_status"] = execute_status - task_info = TaskInfoPersistence() - task_info.persistence_task_info(sample) + sample[self.filesize_key] = "0" + sample[self.filetype_key] = "" + TaskInfoPersistence().update_task_result(sample) return [sample] self.load_sample_to_sample(sample, sample_list) @@ -206,8 +235,8 @@ class Slicer(BaseOp): # 加载文件成功执行信息到数据库 if self.is_last_op: - task_info = TaskInfoPersistence() - task_info.persistence_task_info(sample) + if FileExporter().execute(sample): + TaskInfoPersistence().persistence_task_info(sample) return [sample] @@ -286,22 +315,24 @@ class Filter(BaseOp): sample["execute_status"] = execute_status logger.error(f"Ops named {self.name} map failed, Error Info: \n" f"{str(get_exception_info(e))}") - task_info = TaskInfoPersistence() - task_info.persistence_task_info(sample) + sample[self.filesize_key] = "0" + sample[self.filetype_key] = "" + TaskInfoPersistence().update_task_result(sample) raise e sample["execute_status"] = execute_status # 文件无内容会被过滤 if sample[self.text_key] == "" and sample[self.data_key] == b"": task_info = TaskInfoPersistence() - sample["fileSize"] = "0" - task_info.persistence_task_info(sample) + sample[self.filesize_key] = "0" + sample[self.filetype_key] = "" + task_info.update_task_result(sample) return False # 加载文件成功执行信息到数据库 if self.is_last_op: - task_info = TaskInfoPersistence() - task_info.persistence_task_info(sample) + if FileExporter().execute(sample): + TaskInfoPersistence().persistence_task_info(sample) return True def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: @@ -379,3 +410,131 @@ class LLM(Mapper): raise RuntimeError(f"Save jsonl file Failed!, save_path: {save_path}.") from e logger.info(f"LLM output has been save to {save_path}.") + + +class FileExporter(BaseOp): + """把输入的json文件流抽取为txt""" + + def __init__(self, *args, **kwargs): + super(FileExporter, self).__init__(*args, **kwargs) + self.last_ops = True + self.text_support_ext = kwargs.get("text_support_ext", ['txt', 'html', 'md', 'markdown', + 'xlsx', 'xls', 'csv', 'pptx', 'ppt', + 'xml', 'json', 'doc', 'docx', 'pdf']) + self.data_support_ext = kwargs.get("data_support_ext", ['jpg', 'jpeg', 'png', 'bmp']) + self.medical_support_ext = kwargs.get("medical_support_ext", ['svs', 'tif', 'tiff']) + + def execute(self, sample: Dict[str, Any]): + file_name = sample[self.filename_key] + file_type = sample[self.filetype_key] + + try: + start = time.time() + if file_type in self.text_support_ext: + sample, save_path = self.get_textfile_handler(sample) + elif file_type in self.data_support_ext: + sample, save_path = self.get_datafile_handler(sample) + elif file_type in self.medical_support_ext: + sample, save_path = self.get_medicalfile_handler(sample) + else: + raise TypeError(f"{file_type} is unsupported! please check support_ext in FileExporter Ops") + + if sample[self.text_key] == '' and sample[self.data_key] == b'': + sample[self.filesize_key] = "0" + return False + + if save_path: + self.save_file(sample, save_path) + sample[self.text_key] = '' + sample[self.data_key] = b'' + sample[Fields.result] = True + + file_type = save_path.split('.')[-1] + sample[self.filetype_key] = file_type + + base_name, _ = os.path.splitext(file_name) + new_file_name = base_name + '.' + file_type + sample[self.filename_key] = new_file_name + + base_name, _ = os.path.splitext(save_path) + sample[self.filepath_key] = base_name + file_size = os.path.getsize(base_name) + sample[self.filesize_key] = f"{file_size}" + + logger.info(f"origin file named {file_name} has been save to {save_path}") + logger.info(f"fileName: {sample[self.filename_key]}, " + f"method: FileExporter costs {time.time() - start:.6f} s") + except UnicodeDecodeError as err: + logger.error(f"fileName: {sample[self.filename_key]}, " + f"method: FileExporter causes decode error: {err}") + raise + return True + + def get_save_path(self, sample: Dict[str, Any], target_type): + export_path = os.path.abspath(sample[self.export_path_key]) + file_name = sample[self.filename_key] + new_file_name = os.path.splitext(file_name)[0] + '.' + target_type + + if not check_valid_path(export_path): + os.makedirs(export_path, exist_ok=True) + return os.path.join(export_path, new_file_name) + + def get_textfile_handler(self, sample: Dict[str, Any]): + target_type = sample.get("target_type", None) + + # target_type存在则保存为扫描件, docx格式 + if target_type: + sample = self._get_from_data(sample) + save_path = self.get_save_path(sample, target_type) + # 不存在则保存为txt文件,正常文本清洗 + else: + sample = self._get_from_text(sample) + save_path = self.get_save_path(sample, 'txt') + return sample, save_path + + def get_datafile_handler(self, sample: Dict[str, Any]): + target_type = sample.get("target_type", None) + + # target_type存在, 图转文保存为target_type,markdown格式 + if target_type: + sample = self._get_from_text(sample) + save_path = self.get_save_path(sample, target_type) + # 不存在则保存为原本图片文件格式,正常图片清洗 + else: + sample = self._get_from_data(sample) + save_path = self.get_save_path(sample, sample[self.filetype_key]) + return sample, save_path + + def get_medicalfile_handler(self, sample: Dict[str, Any]): + target_type = 'png' + + sample = self._get_from_data(sample) + save_path = self.get_save_path(sample, target_type) + + return sample, save_path + + def save_file(self, sample, save_path): + file_name, _ = os.path.splitext(save_path) + # 以二进制格式保存文件 + file_sample = sample[self.text_key].encode('utf-8') if sample[self.text_key] else sample[self.data_key] + with open(file_name, 'wb') as f: + f.write(file_sample) + # 获取父目录路径 + + parent_dir = os.path.dirname(file_name) + os.chmod(parent_dir, 0o770) + os.chmod(file_name, 0o640) + + def _get_from_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: + sample[self.data_key] = bytes(sample[self.data_key]) + sample[self.text_key] = '' + return sample + + def _get_from_text(self, sample: Dict[str, Any]) -> Dict[str, Any]: + sample[self.data_key] = b'' + sample[self.text_key] = str(sample[self.text_key]) + return sample + + @staticmethod + def _get_uuid(): + return str(uuid.uuid4()) diff --git a/runtime/python-executor/datamate/core/dataset.py b/runtime/python-executor/datamate/core/dataset.py index bd8fde9..1857524 100644 --- a/runtime/python-executor/datamate/core/dataset.py +++ b/runtime/python-executor/datamate/core/dataset.py @@ -119,6 +119,8 @@ class RayDataset(BasicDataset): # 加载Ops module temp_ops = self.load_ops_module(op_name) + if index == 0: + init_kwargs["is_first_op"] = True if index == len(cfg_process) - 1: init_kwargs["is_last_op"] = True @@ -182,7 +184,8 @@ class RayDataset(BasicDataset): fn_kwargs=kwargs, resources=resources, num_cpus=0.05, - concurrency=(1, 1 if operators_cls.use_model else int(max_actor_nums))) + compute=rd.ActorPoolStrategy(min_size=1, + max_size=int(max_actor_nums))) elif issubclass(operators_cls, (Slicer, RELATIVE_Slicer)): self.data = self.data.flat_map(operators_cls, @@ -190,7 +193,8 @@ class RayDataset(BasicDataset): fn_kwargs=kwargs, resources=resources, num_cpus=0.05, - concurrency=(1, int(max_actor_nums))) + compute=rd.ActorPoolStrategy(min_size=1, + max_size=int(max_actor_nums))) elif issubclass(operators_cls, (Filter, RELATIVE_Filter)): self.data = self.data.filter(operators_cls, @@ -198,7 +202,8 @@ class RayDataset(BasicDataset): fn_kwargs=kwargs, resources=resources, num_cpus=0.05, - concurrency=(1, int(max_actor_nums))) + compute=rd.ActorPoolStrategy(min_size=1, + max_size=int(max_actor_nums))) else: logger.error( 'Ray executor only support Filter, Mapper and Slicer OPs for now') diff --git a/runtime/python-executor/datamate/sql_manager/persistence_atction.py b/runtime/python-executor/datamate/sql_manager/persistence_atction.py index 9219da4..4c425db 100644 --- a/runtime/python-executor/datamate/sql_manager/persistence_atction.py +++ b/runtime/python-executor/datamate/sql_manager/persistence_atction.py @@ -25,13 +25,13 @@ class TaskInfoPersistence: with open(sql_config_path, 'r', encoding='utf-8') as f: return json.load(f) - def persistence_task_info(self, sample: Dict[str, Any]): + def update_task_result(self, sample, file_id = str(uuid.uuid4())): instance_id = str(sample.get("instance_id")) src_file_name = str(sample.get("sourceFileName")) src_file_type = str(sample.get("sourceFileType")) src_file_id = str(sample.get("sourceFileId")) src_file_size = int(sample.get("sourceFileSize")) - file_id = str(uuid.uuid4()) + file_size = str(sample.get("fileSize")) file_type = str(sample.get("fileType")) file_name = str(sample.get("fileName")) @@ -53,6 +53,10 @@ class TaskInfoPersistence: } self.insert_result(result_data, str(self.sql_dict.get("insert_clean_result_sql"))) + def update_file_result(self, sample, file_id): + file_size = str(sample.get("fileSize")) + file_type = str(sample.get("fileType")) + file_name = str(sample.get("fileName")) dataset_id = str(sample.get("dataset_id")) file_path = str(sample.get("filePath")) create_time = datetime.now() @@ -72,6 +76,11 @@ class TaskInfoPersistence: } self.insert_result(file_data, str(self.sql_dict.get("insert_dataset_file_sql"))) + def persistence_task_info(self, sample: Dict[str, Any]): + file_id = str(uuid.uuid4()) + self.update_task_result(sample, file_id) + self.update_file_result(sample, file_id) + @staticmethod def insert_result(data, sql): retries = 0 diff --git a/runtime/python-executor/pyproject.toml b/runtime/python-executor/pyproject.toml index 2a28280..acffbd9 100644 --- a/runtime/python-executor/pyproject.toml +++ b/runtime/python-executor/pyproject.toml @@ -16,27 +16,13 @@ classifiers = [ # Core dependencies dependencies = [ - "uvicorn[standard]", - "fastapi", - "loguru", - "jsonargparse", - "ray[default, data]==2.46.0", - "opencv-python" -] - -[project.optional-dependencies] -dj = [ - "py-data-juicer~=1.4.0" -] - -op = [ - "python-docx==1.1.0" -] - -# All dependencies -all = [ - "datamate[dj]", - "datamate[op]" + "fastapi>=0.123.9", + "jsonargparse>=4.44.0", + "loguru>=0.7.3", + "opencv-python-headless>=4.12.0.88", + "ray[data,default]==2.52.1", + "unstructured[csv,docx,pptx,xlsx]==0.18.15", + "uvicorn[standard]>=0.38.0", ] [build-system] diff --git a/scripts/db/data-cleaning-init.sql b/scripts/db/data-cleaning-init.sql index 3ce98ef..69bebd6 100644 --- a/scripts/db/data-cleaning-init.sql +++ b/scripts/db/data-cleaning-init.sql @@ -59,8 +59,7 @@ VALUES ('26ae585c-8310-4679-adc0-e53215e6e69b', '文本清洗模板', '文本清 ('4421504e-c6c9-4760-b55a-509d17429597', '图片清洗模板', '图片清洗模板'); INSERT IGNORE INTO t_operator_instance(instance_id, operator_id, op_index, settings_override) -VALUES ('26ae585c-8310-4679-adc0-e53215e6e69b', 'TextFormatter', 1, null), - ('26ae585c-8310-4679-adc0-e53215e6e69b', 'FileWithShortOrLongLengthFilter', 2, null), +VALUES ('26ae585c-8310-4679-adc0-e53215e6e69b', 'FileWithShortOrLongLengthFilter', 2, null), ('26ae585c-8310-4679-adc0-e53215e6e69b', 'FileWithHighRepeatWordRateFilter', 3, null), ('26ae585c-8310-4679-adc0-e53215e6e69b', 'FileWithHighRepeatPhraseRateFilter', 4, null), ('26ae585c-8310-4679-adc0-e53215e6e69b', 'FileWithHighSpecialCharRateFilter', 5, null), @@ -85,10 +84,10 @@ VALUES ('26ae585c-8310-4679-adc0-e53215e6e69b', 'TextFormatter', 1, null), ('26ae585c-8310-4679-adc0-e53215e6e69b', 'EmailNumberCleaner', 24, null), ('26ae585c-8310-4679-adc0-e53215e6e69b', 'AnonymizedIpAddress', 25, null), ('26ae585c-8310-4679-adc0-e53215e6e69b', 'AnonymizedIdNumber', 26, null), - ('26ae585c-8310-4679-adc0-e53215e6e69b', 'AnonymizedUrlCleaner', 27, null), - ('26ae585c-8310-4679-adc0-e53215e6e69b', 'FileExporter', 28, null), - ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgFormatter', 1, null), - ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgBlurredImagesCleaner', 2, null), + ('26ae585c-8310-4679-adc0-e53215e6e69b', 'AnonymizedUrlCleaner', 27, null); + +INSERT IGNORE INTO t_operator_instance(instance_id, operator_id, op_index, settings_override) +VALUES ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgBlurredImagesCleaner', 2, null), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgDuplicatedImagesCleaner', 3, null), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgSimilarImagesCleaner', 4, null), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgBrightness', 5, null), @@ -99,5 +98,4 @@ VALUES ('26ae585c-8310-4679-adc0-e53215e6e69b', 'TextFormatter', 1, null), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgShadowRemove', 10, null), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgPerspectiveTransformation', 11, null), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgResize', 12, null), - ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgTypeUnify', 13, null), - ('4421504e-c6c9-4760-b55a-509d17429597', 'FileExporter', 14, null); + ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgTypeUnify', 13, null); \ No newline at end of file diff --git a/scripts/db/data-operator-init.sql b/scripts/db/data-operator-init.sql index 172ffbc..ddb26a1 100644 --- a/scripts/db/data-operator-init.sql +++ b/scripts/db/data-operator-init.sql @@ -67,10 +67,7 @@ VALUES ('64465bec-b46b-11f0-8291-00155d0e4808', '模态', 'modal', 'predefined' INSERT IGNORE INTO t_operator (id, name, description, version, inputs, outputs, runtime, settings, file_name, is_star) -VALUES ('TextFormatter', 'TXT文本抽取', '抽取TXT中的文本。', '1.0.0', 'text', 'text', null, null, '', false), - ('UnstructuredFormatter', 'Unstructured文本抽取', '基于Unstructured抽取非结构化文件的文本,目前支持PowerPoint演示文稿、Word文档以及Excel工作簿。', '1.0.0', 'text', 'text', null, null, '', false), - ('MineruFormatter', 'MinerU PDF文本抽取', '基于MinerU API,抽取PDF中的文本。', '1.0.0', 'text', 'text', null, null, '', false), - ('FileExporter', '落盘算子', '将文件保存到本地目录。', '1.0.0', 'multimodal', 'multimodal', null, null, '', false), +VALUES ('MineruFormatter', 'MinerU PDF文本抽取', '基于MinerU API,抽取PDF中的文本。', '1.0.0', 'text', 'text', null, null, '', false), ('FileWithHighRepeatPhraseRateFilter', '文档词重复率检查', '去除重复词过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatPhraseRatio": {"name": "文档词重复率", "description": "某个词的统计数/文档总词数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}, "hitStopwords": {"name": "去除停用词", "description": "统计重复词时,选择是否要去除停用词。", "type": "switch", "defaultVal": false, "required": true, "checkedLabel": "去除", "unCheckedLabel": "不去除"}}', '', 'false'), ('FileWithHighRepeatWordRateFilter', '文档字重复率检查', '去除重复字过多的文档。', '1.0.0', 'text', 'text', null, '{"repeatWordRatio": {"name": "文档字重复率", "description": "某个字的统计数/文档总字数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.5, "min": 0, "max": 1, "step": 0.1}}', '', 'false'), ('FileWithHighSpecialCharRateFilter', '文档特殊字符率检查', '去除特殊字符过多的文档。', '1.0.0', 'text', 'text', null, '{"specialCharRatio": {"name": "文档特殊字符率", "description": "特殊字符的统计数/文档总字数 > 设定值,该文档被去除。", "type": "slider", "defaultVal": 0.3, "min": 0, "max": 1, "step": 0.1}}', '', 'false'), @@ -97,7 +94,6 @@ VALUES ('TextFormatter', 'TXT文本抽取', '抽取TXT中的文本。', '1.0.0', ('UnicodeSpaceCleaner', '空格标准化', '将文档中不同的 unicode 空格,如 u2008,转换为正常空格\\u0020。', '1.0.0', 'text', 'text', null, null, '', 'false'), ('AnonymizedUrlCleaner', 'URL网址匿名化', '将文档中的url网址匿名化。', '1.0.0', 'text', 'text', null, null, '', 'false'), ('XMLTagCleaner', 'XML标签去除', '去除XML中的标签。', '1.0.0', 'text', 'text', null, null, '', 'false'), - ('ImgFormatter', '读取图片文件', '读取图片文件。', '1.0.0', 'image', 'image', null, null, '', 'false'), ('ImgBlurredImagesCleaner', '模糊图片过滤', '去除模糊的图片。', '1.0.0', 'image', 'image', null, '{"blurredThreshold": {"name": "梯度函数值", "description": "梯度函数值取值越小,图片模糊度越高。", "type": "slider", "defaultVal": 1000, "min": 1, "max": 10000, "step": 1}}', '', 'false'), ('ImgBrightness', '图片亮度增强', '自适应调节图片的亮度。', '1.0.0', 'image', 'image', null, null, '', 'false'), ('ImgContrast', '图片对比度增强', '自适应调节图片的对比度。', '1.0.0', 'image', 'image', null, null, '', 'false'), @@ -117,7 +113,7 @@ SELECT c.id, o.id FROM t_operator_category c CROSS JOIN t_operator o WHERE c.id IN ('d8a5df7a-52a9-42c2-83c4-01062e60f597', '9eda9d5d-072b-499b-916c-797a0a8750e1', '96a3b07a-3439-4557-a835-525faad60ca3') -AND o.id IN ('TextFormatter', 'FileWithShortOrLongLengthFilter', 'FileWithHighRepeatPhraseRateFilter', +AND o.id IN ('FileWithShortOrLongLengthFilter', 'FileWithHighRepeatPhraseRateFilter', 'FileWithHighRepeatWordRateFilter', 'FileWithHighSpecialCharRateFilter', 'FileWithManySensitiveWordsFilter', 'DuplicateFilesFilter', 'DuplicateSentencesFilter', 'AnonymizedCreditCardNumber', 'AnonymizedIdNumber', 'AnonymizedIpAddress', 'AnonymizedPhoneNumber', 'AnonymizedUrlCleaner', 'HtmlTagCleaner', 'XMLTagCleaner', @@ -130,13 +126,6 @@ SELECT c.id, o.id FROM t_operator_category c CROSS JOIN t_operator o WHERE c.id IN ('de36b61c-9e8a-4422-8c31-d30585c7100f', '9eda9d5d-072b-499b-916c-797a0a8750e1', '96a3b07a-3439-4557-a835-525faad60ca3') - AND o.id IN ('ImgFormatter', 'ImgBlurredImagesCleaner', 'ImgBrightness', 'ImgContrast', 'ImgDenoise', + AND o.id IN ('ImgBlurredImagesCleaner', 'ImgBrightness', 'ImgContrast', 'ImgDenoise', 'ImgDuplicatedImagesCleaner', 'ImgPerspectiveTransformation', 'ImgResize', 'ImgSaturation', 'ImgShadowRemove', 'ImgSharpness', 'ImgSimilarImagesCleaner', 'ImgTypeUnify'); - -INSERT IGNORE INTO t_operator_category_relation(category_id, operator_id) -SELECT c.id, o.id -FROM t_operator_category c - CROSS JOIN t_operator o -WHERE c.id IN ('4d7dbd77-0a92-44f3-9056-2cd62d4a71e4', '9eda9d5d-072b-499b-916c-797a0a8750e1', '96a3b07a-3439-4557-a835-525faad60ca3') - AND o.id IN ('FileExporter', 'UnstructuredFormatter'); diff --git a/scripts/images/runtime/Dockerfile b/scripts/images/runtime/Dockerfile index 2b1ddf0..e65b08c 100644 --- a/scripts/images/runtime/Dockerfile +++ b/scripts/images/runtime/Dockerfile @@ -16,7 +16,7 @@ WORKDIR /opt/runtime RUN --mount=type=cache,target=/root/.cache/uv \ uv pip install -e . --system \ - && uv pip install -r /opt/runtime/datamate/ops/requirements.txt --system + && uv pip install -r /opt/runtime/datamate/ops/pyproject.toml --system RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ && chmod +x /opt/runtime/start.sh \