feat(notify): 添加微信订阅消息去重功能

- 在 WechatSubscribeTemplateConfigEntity 中新增 dedupSeconds 字段用于配置去重窗口
- 将去重配置从事件模板映射复制到通知配置实体中
- 集成 RedisTemplate 实现基于时间窗口的消息去重机制
- 支持三种去重模式:永久去重(0)、不设去重(负数)、窗口期去重(正数)
- 实现基于 Redis 分布式锁的重复消息过滤逻辑
- 为非永久去重场景生成唯一数据库幂等键以避免冲突
This commit is contained in:
2026-01-20 20:19:44 +08:00
parent c3fcfdd633
commit 82e844a779
3 changed files with 43 additions and 3 deletions

View File

@@ -1,5 +1,6 @@
package com.ycwl.basic.model.pc.notify.entity; package com.ycwl.basic.model.pc.notify.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data; import lombok.Data;
@@ -64,6 +65,13 @@ public class WechatSubscribeTemplateConfigEntity {
*/ */
private String description; private String description;
/**
* 去重窗口(秒),0表示永久去重,小于0表示不去重,大于0表示窗口期去重
* 来自 wechat_subscribe_event_template 表,仅在事件触发时有效
*/
@TableField(exist = false)
private Integer dedupSeconds;
private Date createTime; private Date createTime;
private Date updateTime; private Date updateTime;

View File

@@ -428,6 +428,8 @@ public class WechatSubscribeNotifyConfigRepository {
if (cfg == null || !Objects.equals(cfg.getEnabled(), 1)) { if (cfg == null || !Objects.equals(cfg.getEnabled(), 1)) {
continue; continue;
} }
// 复制去重配置到模板实体中
cfg.setDedupSeconds(mapping.getDedupSeconds());
result.add(cfg); result.add(cfg);
} }
return result; return result;

View File

@@ -14,6 +14,7 @@ import com.ycwl.basic.utils.NotificationAuthUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.DuplicateKeyException; import org.springframework.dao.DuplicateKeyException;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@@ -23,6 +24,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@@ -47,15 +50,18 @@ public class WechatSubscribeNotifyTriggerService {
private final WechatSubscribeSendLogMapper sendLogMapper; private final WechatSubscribeSendLogMapper sendLogMapper;
private final NotificationAuthUtils notificationAuthUtils; private final NotificationAuthUtils notificationAuthUtils;
private final ZtMessageProducerService ztMessageProducerService; private final ZtMessageProducerService ztMessageProducerService;
private final StringRedisTemplate redisTemplate;
public WechatSubscribeNotifyTriggerService(WechatSubscribeNotifyConfigService configService, public WechatSubscribeNotifyTriggerService(WechatSubscribeNotifyConfigService configService,
WechatSubscribeSendLogMapper sendLogMapper, WechatSubscribeSendLogMapper sendLogMapper,
NotificationAuthUtils notificationAuthUtils, NotificationAuthUtils notificationAuthUtils,
ZtMessageProducerService ztMessageProducerService) { ZtMessageProducerService ztMessageProducerService,
StringRedisTemplate redisTemplate) {
this.configService = configService; this.configService = configService;
this.sendLogMapper = sendLogMapper; this.sendLogMapper = sendLogMapper;
this.notificationAuthUtils = notificationAuthUtils; this.notificationAuthUtils = notificationAuthUtils;
this.ztMessageProducerService = ztMessageProducerService; this.ztMessageProducerService = ztMessageProducerService;
this.redisTemplate = redisTemplate;
} }
/** /**
@@ -91,8 +97,32 @@ public class WechatSubscribeNotifyTriggerService {
continue; continue;
} }
String idempotencyKey = buildIdempotencyKey(eventKey, cfg.getTemplateKey(), request); // 计算基础幂等键
WechatSubscribeSendLogEntity sendLog = buildInitLog(idempotencyKey, eventKey, cfg, request); String baseHash = buildIdempotencyKey(eventKey, cfg.getTemplateKey(), request);
String dbIdempotencyKey = baseHash;
Integer dedupSeconds = cfg.getDedupSeconds();
if (dedupSeconds != null && dedupSeconds != 0) {
// 非永久去重场景
if (dedupSeconds < 0) {
// 不去重:强制生成新的幂等键
dbIdempotencyKey = baseHash + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8);
} else {
// 窗口期去重:依赖 Redis 检查
String redisKey = "wechat:subscribe:dedup:" + baseHash;
Boolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", dedupSeconds, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(success)) {
// 窗口期内已发送
skippedCount++;
continue;
}
// 允许发送,但需要新的 DB 键以记录日志(因为表中 idempotency_key 唯一)
dbIdempotencyKey = baseHash + "_" + UUID.randomUUID().toString().replace("-", "").substring(0, 8);
}
}
// else: dedupSeconds == 0 或 null,使用 baseHash 作为 DB 键,利用 DB 唯一索引实现永久去重
WechatSubscribeSendLogEntity sendLog = buildInitLog(dbIdempotencyKey, eventKey, cfg, request);
if (!tryInsertSendLog(sendLog)) { if (!tryInsertSendLog(sendLog)) {
skippedCount++; skippedCount++;
continue; continue;