You've already forked DataMate
feature: 增加水印去除/高级匿名化算子 (#151)
* feature: 增加水印去除算子 * feature: clean code * feature: clean code * feature: 增加高级匿名化算子
This commit is contained in:
@@ -23,7 +23,6 @@ def _import_operators():
|
||||
from . import garble_characters_cleaner
|
||||
from . import html_tag_cleaner
|
||||
from . import id_number_cleaner
|
||||
from . import img_watermark_remove
|
||||
from . import invisible_characters_cleaner
|
||||
from . import ip_address_cleaner
|
||||
from . import legend_cleaner
|
||||
@@ -47,6 +46,7 @@ def _import_operators():
|
||||
from . import img_resize
|
||||
from . import remove_duplicate_sentences
|
||||
from . import knowledge_relation_slice
|
||||
from . import pii_ner_detection
|
||||
|
||||
|
||||
_import_operators()
|
||||
|
||||
@@ -11,7 +11,6 @@ class BaseModel:
|
||||
|
||||
def __init__(self, model_type='vertical'):
|
||||
models_path = os.getenv("MODELS_PATH", "/home/models")
|
||||
self.resources_path = str(Path(models_path, 'img_direction_correct', 'resources'))
|
||||
args = Namespace()
|
||||
args.cls_image_shape = '3, 224, 224'
|
||||
args.cls_batch_num = 6
|
||||
@@ -20,13 +19,14 @@ class BaseModel:
|
||||
args.use_gpu = False
|
||||
args.use_npu = False
|
||||
args.use_xpu = False
|
||||
args.use_mlu = False
|
||||
args.enable_mkldnn = False
|
||||
if model_type == 'vertical':
|
||||
args.cls_model_dir = str(Path(self.resources_path, 'vertical_model'))
|
||||
args.cls_model_dir = str(Path(models_path, 'ch_ppocr_mobile_v2.0_cls_infer'))
|
||||
self.model_name = 'standard model to detect image 0 or 90 rotated'
|
||||
args.label_list = ['0', '90']
|
||||
else:
|
||||
args.cls_model_dir = str(Path(self.resources_path, 'standard_model'))
|
||||
args.cls_model_dir = str(Path(models_path, 'ch_ppocr_mobile_v2.0_cls_infer'))
|
||||
self.model_name = 'standard model to detect image 0 or 180 rotated'
|
||||
args.label_list = ['0', '180']
|
||||
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from datamate.core.base_op import OPERATORS
|
||||
|
||||
OPERATORS.register_module(module_name='ImgWatermarkRemove',
|
||||
module_path="ops.mapper.img_watermark_remove.process")
|
||||
@@ -1,26 +0,0 @@
|
||||
name: '图片水印去除'
|
||||
name_en: 'Image Watermark Removal'
|
||||
description: '去除图片中的“知乎”和“抖音”水印。'
|
||||
description_en: 'Removes the 知乎 and 抖音 watermarks from images.'
|
||||
language: 'python'
|
||||
vendor: 'huawei'
|
||||
raw_id: 'ImgWatermarkRemove'
|
||||
version: '1.0.0'
|
||||
types:
|
||||
- 'cleanse'
|
||||
modal: 'image'
|
||||
effect:
|
||||
before: ''
|
||||
after: ''
|
||||
inputs: 'image'
|
||||
outputs: 'image'
|
||||
settings:
|
||||
watermarkStr:
|
||||
name: 需要去除的水印文字信息
|
||||
type: checkbox
|
||||
defaultVal: '知乎,抖音'
|
||||
options:
|
||||
- label: 知乎
|
||||
value: 知乎
|
||||
- label: 抖音
|
||||
value: 抖音
|
||||
@@ -1,161 +0,0 @@
|
||||
# # -- encoding: utf-8 --
|
||||
|
||||
#
|
||||
# Description:
|
||||
# Create: 2025/01/06
|
||||
# """
|
||||
import time
|
||||
from typing import Dict, Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from loguru import logger
|
||||
|
||||
from datamate.common.utils import bytes_to_numpy
|
||||
from datamate.common.utils import numpy_to_bytes
|
||||
from datamate.core.base_op import Mapper
|
||||
from .watermark_ocr_model import WatermarkOcrModel
|
||||
|
||||
DEFAULT_MAX_CHARACTERS = 10
|
||||
DEFAULT_BINARY_THRESHOLD_LOW = 200
|
||||
|
||||
|
||||
class ImgWatermarkRemove(Mapper):
|
||||
use_model = True
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.remove_str = kwargs.get("watermarkStr", "知乎,抖音")
|
||||
self.ocr_model = self.get_model(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def _has_kw(result_list, kw_list):
|
||||
"""
|
||||
图片是否包含目标水印,返回匹配到的文字列表
|
||||
"""
|
||||
result_str_list = []
|
||||
for line in result_list:
|
||||
for kw in kw_list:
|
||||
if kw in line[1][0]:
|
||||
result_str_list.append(line[1][0])
|
||||
break
|
||||
return result_str_list
|
||||
|
||||
@staticmethod
|
||||
def _overlay_mask(background_img, prospect_img, img_over_x, img_over_y):
|
||||
back_r, back_c, _ = background_img.shape # 背景图像行数、列数
|
||||
is_x_direction_failed = img_over_x > back_c or img_over_x < 0
|
||||
is_y_direction_failed = img_over_y > back_r or img_over_y < 0
|
||||
if is_x_direction_failed or is_y_direction_failed:
|
||||
# 前景图不在背景图范围内, 直接返回原图
|
||||
return background_img
|
||||
pro_r, pro_c, _ = prospect_img.shape # 前景图像行数、列数
|
||||
if img_over_x + pro_c > back_c: # 如果水平方向展示不全
|
||||
pro_c = back_c - img_over_x # 截取前景图的列数
|
||||
prospect_img = prospect_img[:, 0:pro_c, :] # 截取前景图
|
||||
if img_over_y + pro_r > back_r: # 如果垂直方向展示不全
|
||||
pro_r = back_r - img_over_y # 截取前景图的行数
|
||||
prospect_img = prospect_img[0:pro_r, :, :] # 截取前景图
|
||||
|
||||
prospect_img = cv2.cvtColor(prospect_img, cv2.COLOR_BGR2BGRA) # 前景图转为4通道图像
|
||||
prospect_tmp = np.zeros((back_r, back_c, 4), np.uint8) # 与背景图像等大的临时前景图层
|
||||
|
||||
# 前景图像放到前景图层里
|
||||
prospect_tmp[img_over_y:img_over_y + pro_r, img_over_x: img_over_x + pro_c, :] = prospect_img
|
||||
|
||||
_, binary = cv2.threshold(prospect_img, 254, 255, cv2.THRESH_BINARY) # 前景图阈值处理
|
||||
prospect_mask = np.zeros((pro_r, pro_c, 1), np.uint8) # 单通道前景图像掩模
|
||||
prospect_mask[:, :, 0] = binary[:, :, 3] # 不透明像素的值作为掩模的值
|
||||
|
||||
mask = np.zeros((back_r, back_c, 1), np.uint8)
|
||||
mask[img_over_y:img_over_y + prospect_mask.shape[0],
|
||||
img_over_x: img_over_x + prospect_mask.shape[1]] = prospect_mask
|
||||
|
||||
mask_not = cv2.bitwise_not(mask)
|
||||
|
||||
prospect_tmp = cv2.bitwise_and(prospect_tmp, prospect_tmp, mask=mask)
|
||||
background_img = cv2.bitwise_and(background_img, background_img, mask=mask_not)
|
||||
prospect_tmp = cv2.cvtColor(prospect_tmp, cv2.COLOR_BGRA2BGR) # 前景图层转为三通道图像
|
||||
return prospect_tmp + background_img # 前景图层与背景图像相加合并
|
||||
|
||||
def execute(self, sample: Dict[str, Any]):
|
||||
start = time.time()
|
||||
self.read_file_first(sample)
|
||||
file_name = sample[self.filename_key]
|
||||
file_type = "." + sample[self.filetype_key]
|
||||
img_bytes = sample[self.data_key]
|
||||
if img_bytes:
|
||||
data = bytes_to_numpy(img_bytes)
|
||||
correct_data = self._watermark_remove(data, file_name, self.ocr_model)
|
||||
sample[self.data_key] = numpy_to_bytes(correct_data, file_type)
|
||||
logger.info(f"fileName: {file_name}, method: ImgWatermarkRemove costs {time.time() - start:6f} s")
|
||||
return sample
|
||||
|
||||
def delete_watermark(self, result_list, kw_list, data):
|
||||
"""
|
||||
将符合目标的水印,模糊化处理
|
||||
"""
|
||||
# 获取所有符合目标的文本框位置
|
||||
text_axes_list = []
|
||||
for line in result_list:
|
||||
for kw in kw_list:
|
||||
if kw in line[1][0]:
|
||||
min_width = int(min(line[0][0][0], line[0][3][0]))
|
||||
max_width = int(max(line[0][1][0], line[0][2][0]))
|
||||
min_hight = int(min(line[0][0][1], line[0][1][1]))
|
||||
max_hight = int(max(line[0][2][1], line[0][3][1]))
|
||||
text_axes_list.append([min_width, min_hight, max_width, max_hight])
|
||||
break
|
||||
# 去除水印
|
||||
delt = DEFAULT_MAX_CHARACTERS # 文本框范围扩大
|
||||
img = data
|
||||
for text_axes in text_axes_list:
|
||||
hight, width = img.shape[0:2]
|
||||
# 截取图片
|
||||
min_width = text_axes[0] - delt if text_axes[0] - delt >= 0 else 0
|
||||
min_hight = text_axes[1] - delt if text_axes[1] - delt >= 0 else 0
|
||||
max_width = text_axes[2] + delt if text_axes[2] + delt <= width else width
|
||||
max_hight = text_axes[3] + delt if text_axes[3] + delt <= hight else hight
|
||||
cropped = img[min_hight:max_hight, min_width:max_width] # 裁剪坐标为[y0:y1, x0:x1]
|
||||
# 图片二值化处理,把[200,200,200]-[250,250,250]以外的颜色变成0
|
||||
start_rgb = DEFAULT_BINARY_THRESHOLD_LOW
|
||||
thresh = cv2.inRange(cropped, np.array([start_rgb, start_rgb, start_rgb]), np.array([250, 250, 250]))
|
||||
# 创建形状和尺寸的结构元素
|
||||
kernel = np.ones((3, 3), np.uint8) # 设置卷积核3*3全是1;将当前的数组作为图像类型来进⾏各种操作,就要转换到uint8类型
|
||||
# 扩展待修复区域
|
||||
hi_mask = cv2.dilate(thresh, kernel, iterations=10) # 膨胀操作,白色区域增大,iterations迭代次数
|
||||
specular = cv2.inpaint(cropped, hi_mask, 5, flags=cv2.INPAINT_TELEA)
|
||||
# imgSY:输入8位1通道或3通道图像。
|
||||
# hi_mask:修复掩码,8位1通道图像。非零像素表示需要修复的区域。
|
||||
# specular:输出与imgSY具有相同大小和类型的图像。
|
||||
# 5:算法考虑的每个点的圆形邻域的半径。
|
||||
# flags:NPAINT_NS基于Navier-Stokes的方法、Alexandru Telea的INPAINT_TELEA方法
|
||||
result = self._overlay_mask(img, specular, min_width, min_hight)
|
||||
img = result
|
||||
return img
|
||||
|
||||
def init_model(self, *args, **kwargs):
|
||||
return WatermarkOcrModel(*args, **kwargs).ocr_model
|
||||
|
||||
def _watermark_remove(self, data, file_name, model):
|
||||
"""
|
||||
去除水印的方法
|
||||
"""
|
||||
remove_str = self.remove_str
|
||||
# 勾选去水印的信息为空,则直接返回原图
|
||||
if remove_str == "":
|
||||
return data
|
||||
kw_list = remove_str.split(',')
|
||||
# 加载模型
|
||||
ocr_model = model
|
||||
try:
|
||||
result = ocr_model.ocr(data, cls=True)
|
||||
except RuntimeError as e:
|
||||
logger.error(f"fileName: {file_name}, method: ocr predict error {e}")
|
||||
return data
|
||||
if result and result[0]:
|
||||
logger.info(f"fileName: {file_name}, method: ocrModel detect watermark info {str(result)}")
|
||||
return self.delete_watermark(result[0], kw_list, data)
|
||||
else:
|
||||
logger.info(f"fileName: {file_name}, method: ImgWatermarkRemove not need remove target ocr")
|
||||
return data
|
||||
@@ -1,25 +0,0 @@
|
||||
# -- encoding: utf-8 --
|
||||
|
||||
import gc
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class WatermarkOcrModel:
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
models_path = os.getenv("MODELS_PATH", "/home/models")
|
||||
self.resources_path = str(Path(models_path, 'img_watermark_remove', 'resources'))
|
||||
self.det_model_dir = str(Path(self.resources_path, 'ch_PP-OCRv4_det_infer'))
|
||||
self.rec_model_dir = str(Path(self.resources_path, 'ch_PP-OCRv4_rec_infer'))
|
||||
self.cls_model_dir = str(Path(self.resources_path, 'ch_ppocr_mobild_v2_cls_infer'))
|
||||
|
||||
from paddleocr import PaddleOCR
|
||||
self.ocr_model = PaddleOCR(det_model_dir=self.det_model_dir, cls_model_dir=self.cls_model_dir,
|
||||
rec_model_dir=self.rec_model_dir,
|
||||
use_angle_cls=True,
|
||||
lang='ch')
|
||||
|
||||
def __del__(self):
|
||||
del self.ocr_model
|
||||
gc.collect()
|
||||
4
runtime/ops/mapper/pii_ner_detection/__init__.py
Normal file
4
runtime/ops/mapper/pii_ner_detection/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from datamate.core.base_op import OPERATORS
|
||||
|
||||
OPERATORS.register_module(module_name='PiiDetector',
|
||||
module_path='ops.mapper.pii_ner_detection.process')
|
||||
62
runtime/ops/mapper/pii_ner_detection/custom_entities.py
Normal file
62
runtime/ops/mapper/pii_ner_detection/custom_entities.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import presidio_analyzer as analyzer
|
||||
|
||||
# 中国身份证号识别器
|
||||
id_recognizer = analyzer.PatternRecognizer(
|
||||
supported_entity="ID_CHINA",
|
||||
supported_language="zh",
|
||||
patterns=[
|
||||
analyzer.Pattern(
|
||||
name="china_id_pattern",
|
||||
regex=r"\b[1-9]\d{5}(19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b|\b[1-9]\d{7}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}\b",
|
||||
score=0.9
|
||||
)
|
||||
],
|
||||
context=["身份证", "身份证明", "身份证号", "证件号码"]
|
||||
)
|
||||
|
||||
# 中国电话号码识别器
|
||||
phone_recognizer = analyzer.PatternRecognizer(
|
||||
supported_entity="Phone_CHINA",
|
||||
supported_language="zh",
|
||||
patterns=[
|
||||
analyzer.Pattern(
|
||||
name="china_mobile_pattern",
|
||||
regex=r"\b(1[3-9]\d{9})\b",
|
||||
score=0.85
|
||||
),
|
||||
analyzer.Pattern(
|
||||
name="china_landline_pattern",
|
||||
regex=r"\b(0\d{2,3}-?\d{7,8})\b",
|
||||
score=0.8
|
||||
)
|
||||
],
|
||||
context=["电话", "手机", "联系方式", "联系电话"]
|
||||
)
|
||||
|
||||
# 中国邮编识别器
|
||||
zipcode_recognizer = analyzer.PatternRecognizer(
|
||||
supported_entity="ZIPCODE_CHINA",
|
||||
supported_language="zh",
|
||||
patterns=[
|
||||
analyzer.Pattern(
|
||||
name="china_zipcode_pattern",
|
||||
regex=r"\b[1-9]\d{5}\b",
|
||||
score=0.7
|
||||
)
|
||||
],
|
||||
context=["邮编", "邮政编码", "邮编号码"]
|
||||
)
|
||||
|
||||
# 兼容中文域名的URL识别器
|
||||
url_recognizer = analyzer.PatternRecognizer(
|
||||
supported_entity="URL",
|
||||
supported_language="zh",
|
||||
patterns=[
|
||||
analyzer.Pattern(
|
||||
name="url_pattern",
|
||||
regex=r"\b((?:https?://|www\.)[\w-]+\.[\w-]+\S*|(?:https?://|www\.)[\u4e00-\u9fa5]+\.[\u4e00-\u9fa5]+\S*)\b",
|
||||
score=0.9
|
||||
)
|
||||
],
|
||||
context=["网址", "链接", "网站", "网页"]
|
||||
)
|
||||
9
runtime/ops/mapper/pii_ner_detection/metadata.yml
Normal file
9
runtime/ops/mapper/pii_ner_detection/metadata.yml
Normal file
@@ -0,0 +1,9 @@
|
||||
name: '高级匿名化'
|
||||
language: 'Python'
|
||||
vendor: 'others'
|
||||
raw_id: 'PiiDetector'
|
||||
version: '1.0.0'
|
||||
description: '高级匿名化算子,检测命名实体并匿名化。'
|
||||
modal: 'text'
|
||||
inputs: 'text'
|
||||
outputs: 'text'
|
||||
52
runtime/ops/mapper/pii_ner_detection/process.py
Normal file
52
runtime/ops/mapper/pii_ner_detection/process.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import presidio_analyzer as analyzer
|
||||
import presidio_anonymizer as anonymizer
|
||||
import spacy
|
||||
|
||||
from datamate.core.base_op import Mapper
|
||||
|
||||
from .custom_entities import id_recognizer, phone_recognizer, zipcode_recognizer, url_recognizer
|
||||
|
||||
|
||||
class PiiDetector(Mapper):
|
||||
custom_ops = True
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PiiDetector, self).__init__(*args, **kwargs)
|
||||
self.support_language = kwargs.get("support_language", "zh")
|
||||
|
||||
self.nlp_engine = None
|
||||
self.text_analyzer = None
|
||||
self.anom = None
|
||||
|
||||
self.init_model(*args, **kwargs)
|
||||
|
||||
def init_model(self, *args, **kwargs):
|
||||
spacy.load("zh_core_web_sm")
|
||||
provider = analyzer.nlp_engine.NlpEngineProvider(
|
||||
nlp_configuration={
|
||||
"nlp_engine_name": "spacy",
|
||||
"models": [
|
||||
{"lang_code": "zh", "model_name": "zh_core_web_sm"}
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
# 创建NLP Engine
|
||||
self.nlp_engine = provider.create_engine()
|
||||
|
||||
# 初始化AnalyzerEngine
|
||||
self.text_analyzer = analyzer.AnalyzerEngine(nlp_engine=self.nlp_engine, supported_languages=["zh"])
|
||||
self.text_analyzer.registry.load_predefined_recognizers()
|
||||
for recognizer in [id_recognizer, phone_recognizer, zipcode_recognizer, url_recognizer]:
|
||||
self.text_analyzer.registry.add_recognizer(recognizer)
|
||||
|
||||
# 初始化AnonymizerEngine
|
||||
self.anom = anonymizer.AnonymizerEngine()
|
||||
|
||||
def execute(self, sample):
|
||||
self.read_file_first(sample)
|
||||
text = sample.get('text')
|
||||
analyzer_results = self.text_analyzer.analyze(text=text, language=self.support_language)
|
||||
res = self.anom.anonymize(text=text, analyzer_results=analyzer_results)
|
||||
sample['text'] = res.text
|
||||
return sample
|
||||
Reference in New Issue
Block a user