#!/user/bin/python # -*- coding: utf-8 -*- """ Description: 身份证号码匿名化插件 Create: 2024/12/5 15:43 """ import re import time from datetime import datetime from pathlib import Path from typing import Dict, Any from loguru import logger import pytz from datamate.core.base_op import Mapper class AnonymizedIdNumber(Mapper): def __init__(self, *args, **kwargs): super(AnonymizedIdNumber, self).__init__(*args, **kwargs) self.id_number_re_compile = self.get_id_number_re_compile() self.id_coefficient = (7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2) self.id_verification = ("1", "0", "X", "9", "8", "7", "6", "5", "4", "3", "2") self.area_code_enum = self.load_code_list() @staticmethod def get_id_number_re_compile(): """获取身份证号码正则匹配对象""" # 中国身份证号共计18位,1,2位省份,3,4位城市,5,6位县区码,7~14位为出生日期,最后一位为校验码,做了严格限定 id_card_pattern = r'(?<=[^0-9])' \ r'((1[1-5]|2[1-3]|3[1-7]|4[1-6]|5[0-4]|6[1-5]|71|81|82)' \ r'(0[0-9]|1[0-9]|2[0-9]|3[0-4]|4[0-3]|5[1-3]|90)' \ r'(0[0-9]|1[0-9]|2[0-9]|3[0-9]|4[0-3]|5[1-7]|6[1-4]|7[1-4]|8[1-7])' \ r'(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])' \ r'\d{3}[0-9xX])' \ r'(?=[^0-9xX])' return re.compile(id_card_pattern) @staticmethod def load_code_list(): """编码表加载""" area_code_enum_path = str(Path(__file__).parent / 'resources' / 'area_code_enum.txt') with open(area_code_enum_path, 'r', encoding='utf-8') as f: area_code_list = set(f.read().splitlines()) return area_code_list @staticmethod def _verify_birthday_code(birthday_code: str): """判断出生日期编码的8位数是否有效""" year = int(birthday_code[:4]) month = int(birthday_code[4:6]) day = int(birthday_code[6:8]) date_string = "{}-{}-{}".format(year, month, day) date_format = "%Y-%m-%d" try: # 将日期字符串转换成时间 date = datetime.strptime(date_string, date_format) # 设置时区为上海 china_tz = pytz.timezone("Asia/Shanghai") china_date = china_tz.localize(date) # 获取当前时间 current_date = datetime.now(china_tz) # 判断出生日期是否晚于当前时间;若晚于,则出生日期不合法 return china_date <= current_date except ValueError: return False def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: start = time.time() self.read_file_first(sample) sample[self.text_key] = self._id_number_filter(sample[self.text_key]) logger.info(f"fileName: {sample[self.filename_key]}, method: IDNumberCleaner costs {time.time() - start:6f} s") return sample def _verify_area_code(self, area_code: str): """判断地域编码的6位数是否有效""" return area_code in self.area_code_enum def _verify_verification_code(self, id_number: str): """身份证号码校验码正确性校验""" verify_num = id_number[-1] # 将身份证号码前17位数分别乘以不同的系数,即self.id_coefficient,再将相乘结果相加 id_sum = sum([int(num) * coe for num, coe in zip(id_number[:-1], self.id_coefficient)]) # 判断相加总和除以11的余数是否等于身份证号码最后一位 return verify_num.upper() == self.id_verification[id_sum % 11].upper() def _verify_id_number(self, id_number: str): """验证身份证号码有效性主函数""" return self._verify_verification_code(id_number) and \ self._verify_birthday_code(id_number[6:14]) and \ self._verify_area_code(id_number[:6]) def _verify_similar_id_number(self, id_number: str): """用于宽松匹配类似身份证的字符串,不进行严格有效性验证。""" if len(id_number) != 18: return False if not id_number[:17].isdigit(): return False last_char = id_number[-1].upper() return last_char in set('0123456789X') def _id_number_filter(self, input_data: str): """身份证号码匿名化""" input_data = ''.join(['【', input_data, '】']) # 抽取符合身份证正则匹配的字符串 id_nums = [item.group(1) for item in self.id_number_re_compile.finditer(input_data)] # 判断抽取的字符串是不是真实的身份证号码 for id_num in id_nums: if self._verify_id_number(id_num) or self._verify_similar_id_number(id_num): # 替换有效身份证号码为 id_num_pattern = r"(?<=[^0-9]){}(?=[^0-9xX])".format(id_num) input_data = re.compile(id_num_pattern).sub("", input_data) return input_data[1:-1]