Files
DataMate/runtime/ops/mapper/img_perspective_transformation/process.py
hhhhsc701 d59c167da4 算子将抽取与落盘固定到流程中 (#134)
* feature: 将抽取动作移到每一个算子中

* feature: 落盘算子改为默认执行

* feature: 优化前端展示

* feature: 使用pyproject管理依赖
2025-12-05 17:26:29 +08:00

149 lines
5.0 KiB
Python

# -- encoding: utf-8 --
"""
Description:
Create: 2025/01/16
"""
import time
from typing import Dict, Any
import cv2
import numpy as np
from loguru import logger
from datamate.common.utils import bytes_transform
from datamate.core.base_op import Mapper
class ImgPerspectiveTransformation(Mapper):
"""图片透视变换插件"""
def __init__(self, *args, **kwargs):
super(ImgPerspectiveTransformation, self).__init__(*args, **kwargs)
self.transform_utils = PerspectiveTransformationUtils()
def execute(self, sample: Dict[str, Any]):
start = time.time()
self.read_file_first(sample)
img_bytes = sample[self.data_key]
file_name = sample[self.filename_key]
file_type = "." + sample[self.filetype_key]
if img_bytes:
img_data = bytes_transform.bytes_to_numpy(img_bytes)
transform_img = self._transform_img(img_data, file_name)
sample[self.data_key] = bytes_transform.numpy_to_bytes(transform_img, file_type)
logger.info(f"fileName: {file_name}, method: ImgPerspectiveTransformation costs {time.time() - start:6f} s")
return sample
def _transform_img(self, image, file_name):
original_img = image
ratio = 900 / image.shape[0]
# 固定尺寸
img_resize = self.transform_utils.resize_img(image)
# 边缘检测
binary_img = self.transform_utils.get_canny(img_resize)
# 轮廓
max_contour, max_area = self.transform_utils.find_max_contour(binary_img)
if not max_contour.size:
return original_img
# 多边形拟合凸包的四个顶点
boxes = self.transform_utils.get_box_point(max_contour)
if len(boxes) == 4:
boxes = self.transform_utils.get_adapt_point(boxes, ratio)
boxes = self.transform_utils.order_points(boxes)
warped = self.transform_utils.get_warp_image(image, boxes)
logger.info(f"fileName: {file_name}, method: ImgPerspectiveTransformation. "
"This picture is transformed by perspective.")
return warped
return original_img
class PerspectiveTransformationUtils:
"""图片透视变换工具类"""
@staticmethod
def resize_img(image, height=900):
"""固定尺寸"""
h, w = image.shape[:2]
pro = height / h
size = (int(w * pro), int(height))
img_resize = cv2.resize(image, size)
return img_resize
@staticmethod
def get_canny(image):
"""边缘检测"""
# 高斯滤波
binary = cv2.GaussianBlur(image, (3, 3), 2, 2)
# 边缘检测
binary = cv2.Canny(binary, 60, 240, apertureSize=3)
# 膨胀操作,尽量使边缘闭合
kernel = np.ones((3, 3), np.uint8)
binary = cv2.dilate(binary, kernel, iterations=1)
return binary
@staticmethod
def find_max_contour(image):
"""求出面积最大的轮廓"""
# 寻找边缘
contours, _ = cv2.findContours(image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
# 计算面积
max_area = 0.0
max_contour = np.array([])
for contour in contours:
current_area = cv2.contourArea(contour)
if current_area > max_area:
max_area = current_area
max_contour = contour
return max_contour, max_area
@staticmethod
def get_box_point(contour):
"""多边形拟合凸包的四个顶点"""
# 多边形拟合凸包
hull = cv2.convexHull(contour)
epsilon = 0.02 * cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(hull, epsilon, True)
approx = approx.reshape((len(approx), 2))
return approx
@staticmethod
def get_adapt_point(box, pro):
"""适配原四边形点集"""
box_pro = box
if pro != 1.0:
box_pro = box / pro
box_pro = np.trunc(box_pro)
return box_pro
@staticmethod
def order_points(pts):
"""四边形顶点排序,[top-left, top-right, bottom-right, bottom-left]"""
rect = np.zeros((4, 2), dtype="float32")
s = pts.sum(axis=1)
rect[0] = pts[np.argmin(s)]
rect[2] = pts[np.argmax(s)]
diff = np.diff(pts, axis=1)
rect[1] = pts[np.argmin(diff)]
rect[3] = pts[np.argmax(diff)]
return np.intp(rect)
@staticmethod
def compute_point_distance(a, b):
"""计算长宽"""
return int(np.sqrt(np.sum(np.square(a - b))))
def get_warp_image(self, image, box):
"""透视变换"""
w, h = self.compute_point_distance(box[0], box[1]), \
self.compute_point_distance(box[1], box[2])
dst_rect = np.array([[0, 0],
[w - 1, 0],
[w - 1, h - 1],
[0, h - 1]], dtype='float32')
box = np.array(box, dtype='float32')
matrix = cv2.getPerspectiveTransform(box, dst_rect)
warped = cv2.warpPerspective(image, matrix, (w, h))
return warped