From 82e844a779c8e1488926eeb4a98be83ce091102b Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Tue, 20 Jan 2026 20:19:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(notify):=20=E6=B7=BB=E5=8A=A0=E5=BE=AE?= =?UTF-8?q?=E4=BF=A1=E8=AE=A2=E9=98=85=E6=B6=88=E6=81=AF=E5=8E=BB=E9=87=8D?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 WechatSubscribeTemplateConfigEntity 中新增 dedupSeconds 字段用于配置去重窗口 - 将去重配置从事件模板映射复制到通知配置实体中 - 集成 RedisTemplate 实现基于时间窗口的消息去重机制 - 支持三种去重模式:永久去重(0)、不设去重(负数)、窗口期去重(正数) - 实现基于 Redis 分布式锁的重复消息过滤逻辑 - 为非永久去重场景生成唯一数据库幂等键以避免冲突 --- .../WechatSubscribeTemplateConfigEntity.java | 8 +++++ ...WechatSubscribeNotifyConfigRepository.java | 2 ++ .../WechatSubscribeNotifyTriggerService.java | 36 +++++++++++++++++-- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/ycwl/basic/model/pc/notify/entity/WechatSubscribeTemplateConfigEntity.java b/src/main/java/com/ycwl/basic/model/pc/notify/entity/WechatSubscribeTemplateConfigEntity.java index 0b7968fb..3db180f1 100644 --- a/src/main/java/com/ycwl/basic/model/pc/notify/entity/WechatSubscribeTemplateConfigEntity.java +++ b/src/main/java/com/ycwl/basic/model/pc/notify/entity/WechatSubscribeTemplateConfigEntity.java @@ -1,5 +1,6 @@ package com.ycwl.basic.model.pc.notify.entity; +import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; @@ -64,6 +65,13 @@ public class WechatSubscribeTemplateConfigEntity { */ private String description; + /** + * 去重窗口(秒),0表示永久去重,小于0表示不去重,大于0表示窗口期去重 + * 来自 wechat_subscribe_event_template 表,仅在事件触发时有效 + */ + @TableField(exist = false) + private Integer dedupSeconds; + private Date createTime; private Date updateTime; diff --git a/src/main/java/com/ycwl/basic/repository/WechatSubscribeNotifyConfigRepository.java b/src/main/java/com/ycwl/basic/repository/WechatSubscribeNotifyConfigRepository.java index b768ba4f..6dbaa6d6 100644 --- a/src/main/java/com/ycwl/basic/repository/WechatSubscribeNotifyConfigRepository.java +++ b/src/main/java/com/ycwl/basic/repository/WechatSubscribeNotifyConfigRepository.java @@ -428,6 +428,8 @@ public class WechatSubscribeNotifyConfigRepository { if (cfg == null || !Objects.equals(cfg.getEnabled(), 1)) { continue; } + // 复制去重配置到模板实体中 + cfg.setDedupSeconds(mapping.getDedupSeconds()); result.add(cfg); } return result; diff --git a/src/main/java/com/ycwl/basic/service/notify/WechatSubscribeNotifyTriggerService.java b/src/main/java/com/ycwl/basic/service/notify/WechatSubscribeNotifyTriggerService.java index 39a70bef..504a2469 100644 --- a/src/main/java/com/ycwl/basic/service/notify/WechatSubscribeNotifyTriggerService.java +++ b/src/main/java/com/ycwl/basic/service/notify/WechatSubscribeNotifyTriggerService.java @@ -14,6 +14,7 @@ import com.ycwl.basic.utils.NotificationAuthUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.dao.DuplicateKeyException; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; @@ -23,6 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -47,15 +50,18 @@ public class WechatSubscribeNotifyTriggerService { private final WechatSubscribeSendLogMapper sendLogMapper; private final NotificationAuthUtils notificationAuthUtils; private final ZtMessageProducerService ztMessageProducerService; + private final StringRedisTemplate redisTemplate; public WechatSubscribeNotifyTriggerService(WechatSubscribeNotifyConfigService configService, WechatSubscribeSendLogMapper sendLogMapper, NotificationAuthUtils notificationAuthUtils, - ZtMessageProducerService ztMessageProducerService) { + ZtMessageProducerService ztMessageProducerService, + StringRedisTemplate redisTemplate) { this.configService = configService; this.sendLogMapper = sendLogMapper; this.notificationAuthUtils = notificationAuthUtils; this.ztMessageProducerService = ztMessageProducerService; + this.redisTemplate = redisTemplate; } /** @@ -91,8 +97,32 @@ public class WechatSubscribeNotifyTriggerService { continue; } - String idempotencyKey = buildIdempotencyKey(eventKey, cfg.getTemplateKey(), request); - WechatSubscribeSendLogEntity sendLog = buildInitLog(idempotencyKey, eventKey, cfg, request); + // 计算基础幂等键 + String baseHash = buildIdempotencyKey(eventKey, cfg.getTemplateKey(), request); + String dbIdempotencyKey = baseHash; + Integer dedupSeconds = cfg.getDedupSeconds(); + + if (dedupSeconds != null && dedupSeconds != 0) { + // 非永久去重场景 + if (dedupSeconds < 0) { + // 不去重:强制生成新的幂等键 + dbIdempotencyKey = baseHash + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8); + } else { + // 窗口期去重:依赖 Redis 检查 + String redisKey = "wechat:subscribe:dedup:" + baseHash; + Boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", dedupSeconds, TimeUnit.SECONDS); + if (Boolean.FALSE.equals(success)) { + // 窗口期内已发送 + skippedCount++; + continue; + } + // 允许发送,但需要新的 DB 键以记录日志(因为表中 idempotency_key 唯一) + dbIdempotencyKey = baseHash + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8); + } + } + // else: dedupSeconds == 0 或 null,使用 baseHash 作为 DB 键,利用 DB 唯一索引实现永久去重 + + WechatSubscribeSendLogEntity sendLog = buildInitLog(dbIdempotencyKey, eventKey, cfg, request); if (!tryInsertSendLog(sendLog)) { skippedCount++; continue;