Files
DataMate/runtime/ops/mapper/political_word_cleaner/process.py
hhhhsc701 d59c167da4 算子将抽取与落盘固定到流程中 (#134)
* feature: 将抽取动作移到每一个算子中

* feature: 落盘算子改为默认执行

* feature: 优化前端展示

* feature: 使用pyproject管理依赖
2025-12-05 17:26:29 +08:00

69 lines
2.2 KiB
Python

#!/user/bin/python
# -*- coding: utf-8 -*-
"""
Description: 政治文本过滤
Create: 2024/12/26 15:43
"""
import time
from pathlib import Path
from typing import Dict, Any
from loguru import logger
from datamate.common.utils.aho_corasick import AhoCorasic
from datamate.core.base_op import Mapper
class PoliticalWordCleaner(Mapper):
"""外部输入的政治文本过滤插件"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
root_path = Path(__file__).parent / 'resources'
political_file_path = str(root_path / 'political.txt')
special_symbols_path = str(root_path / 'special_symbols.txt')
self.special_symbols = self.load_words_list(special_symbols_path)
self.political_words = self.load_words_list(political_file_path)
self.ac_automaton = AhoCorasic(self.political_words)
@staticmethod
def load_words_list(path):
"""词表加载"""
with open(path, 'r', encoding='utf-8') as f:
words = set(f.read().splitlines())
return words
@staticmethod
def words_replace(target_strings: list, text: str):
"""
目标字符串替换。
Args:
target_strings: 前缀树根节点。
text: 待清洗文本。
returns:
清洗后文本。
"""
target_strings.sort(key=lambda x: -len(x))
for s in target_strings:
tmp_text = text.replace(s, '*' * len(s))
text = tmp_text
return text
def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]:
start = time.time()
self.read_file_first(sample)
sample[self.text_key] = self._political_word_filter(sample[self.text_key])
logger.info(
f"fileName: {sample[self.filename_key]}, method: PoliticalWordCleaner costs {time.time() - start:6f} s")
return sample
def _political_word_filter(self, text):
"""词语过滤主函数,分行过滤"""
filtered_rows = []
for row in text.split('\n'):
matched_words = self.ac_automaton.search(row, self.special_symbols)
filtered_rows.append(self.words_replace(matched_words, row))
return '\n'.join(filtered_rows)