Files
FrameTour-BE/src/main/java/com/ycwl/basic/service/notify/WechatSubscribeNotifyTriggerService.java
Jerry Yan f1a2958251 feat(notification): 添加微信订阅消息配置管理及幂等授权功能
- 新增微信订阅消息配置管理控制器,支持模板、场景、事件映射配置
- 实现用户通知授权服务的幂等控制,避免前端重试导致授权次数虚增
- 添加微信订阅消息发送日志记录,用于幂等与排障
- 新增视频生成完成时的订阅消息触发功能
- 实现场景模板查询接口,返回用户授权余额信息
- 添加模板V2相关数据表映射器和实体类
- 集成微信订阅消息触发服务到任务完成流程中
2026-01-01 17:53:59 +08:00

283 lines
12 KiB
Java

package com.ycwl.basic.service.notify;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.mapper.WechatSubscribeSendLogMapper;
import com.ycwl.basic.model.pc.notify.entity.WechatSubscribeSendLogEntity;
import com.ycwl.basic.model.pc.notify.entity.WechatSubscribeTemplateConfigEntity;
import com.ycwl.basic.model.pc.notify.req.WechatSubscribeNotifyTriggerRequest;
import com.ycwl.basic.model.pc.notify.resp.WechatSubscribeNotifyTriggerResult;
import com.ycwl.basic.utils.JacksonUtil;
import com.ycwl.basic.utils.NotificationAuthUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 微信订阅消息统一触发器(后端内部调用)
*
* @Author: System
* @Date: 2025/12/31
*/
@Service
@Slf4j
public class WechatSubscribeNotifyTriggerService {
private static final Pattern VAR_PATTERN = Pattern.compile("\\$\\{([^}]+)}");
private static final String STATUS_INIT = "INIT";
private static final String STATUS_SENT = "SENT";
private static final String STATUS_SKIPPED_NO_AUTH = "SKIPPED_NO_AUTH";
private static final String STATUS_FAILED = "FAILED";
private final WechatSubscribeNotifyConfigService configService;
private final WechatSubscribeSendLogMapper sendLogMapper;
private final NotificationAuthUtils notificationAuthUtils;
private final ZtMessageProducerService ztMessageProducerService;
public WechatSubscribeNotifyTriggerService(WechatSubscribeNotifyConfigService configService,
WechatSubscribeSendLogMapper sendLogMapper,
NotificationAuthUtils notificationAuthUtils,
ZtMessageProducerService ztMessageProducerService) {
this.configService = configService;
this.sendLogMapper = sendLogMapper;
this.notificationAuthUtils = notificationAuthUtils;
this.ztMessageProducerService = ztMessageProducerService;
}
/**
* 触发订阅消息发送(支持按 scenicId 覆盖 + 幂等 + 授权消费)
*/
public WechatSubscribeNotifyTriggerResult trigger(String eventKey, WechatSubscribeNotifyTriggerRequest request) {
WechatSubscribeNotifyTriggerResult result = new WechatSubscribeNotifyTriggerResult();
if (StringUtils.isBlank(eventKey) || request == null) {
log.warn("订阅消息触发入参非法: eventKey={}, request={}", eventKey, request);
return result;
}
if (request.getScenicId() == null || request.getMemberId() == null || StringUtils.isBlank(request.getOpenId())) {
log.warn("订阅消息触发缺少必要字段: eventKey={}, scenicId={}, memberId={}, openId={}",
eventKey, request.getScenicId(), request.getMemberId(), request.getOpenId());
return result;
}
List<WechatSubscribeTemplateConfigEntity> templateConfigs =
configService.listEventTemplateConfigs(request.getScenicId(), eventKey);
if (templateConfigs.isEmpty()) {
return result;
}
result.setConfigFound(true);
Map<String, Object> variables = buildVariables(eventKey, request);
int sentCount = 0;
int skippedCount = 0;
for (WechatSubscribeTemplateConfigEntity cfg : templateConfigs) {
if (cfg == null || StringUtils.isBlank(cfg.getWechatTemplateId()) || StringUtils.isBlank(cfg.getTemplateKey())) {
skippedCount++;
continue;
}
String idempotencyKey = buildIdempotencyKey(eventKey, cfg.getTemplateKey(), request);
WechatSubscribeSendLogEntity sendLog = buildInitLog(idempotencyKey, eventKey, cfg, request);
if (!tryInsertSendLog(sendLog)) {
skippedCount++;
continue;
}
try {
// 检查并消费授权
if (!notificationAuthUtils.checkAndConsumeAuthorization(
request.getMemberId(), cfg.getWechatTemplateId(), request.getScenicId())) {
updateSendLog(sendLog.getId(), STATUS_SKIPPED_NO_AUTH, null, null);
skippedCount++;
continue;
}
ZtMessage msg = buildZtMessage(cfg, request.getOpenId(), variables, eventKey);
ztMessageProducerService.send(msg);
updateSendLog(sendLog.getId(), STATUS_SENT, msg.getMessageId(), null);
sentCount++;
} catch (Exception e) {
updateSendLog(sendLog.getId(), STATUS_FAILED, null, e.getMessage());
skippedCount++;
log.error("订阅消息发送失败: eventKey={}, templateKey={}, memberId={}, scenicId={}, error={}",
eventKey, cfg.getTemplateKey(), request.getMemberId(), request.getScenicId(), e.getMessage(), e);
}
}
result.setSentCount(sentCount);
result.setSkippedCount(skippedCount);
return result;
}
private boolean tryInsertSendLog(WechatSubscribeSendLogEntity logEntity) {
try {
return sendLogMapper.insert(logEntity) > 0;
} catch (DuplicateKeyException e) {
// 幂等命中:直接跳过,不再重复消费授权
return false;
}
}
private void updateSendLog(Long id, String status, String messageId, String errorMessage) {
if (id == null) {
return;
}
WechatSubscribeSendLogEntity update = new WechatSubscribeSendLogEntity();
update.setStatus(status);
update.setZtMessageId(messageId);
update.setErrorMessage(errorMessage);
update.setUpdateTime(new Date());
sendLogMapper.update(update, new QueryWrapper<WechatSubscribeSendLogEntity>().eq("id", id));
}
private static String buildIdempotencyKey(String eventKey, String templateKey, WechatSubscribeNotifyTriggerRequest request) {
String bizId = Objects.toString(request.getBizId(), "");
String raw = eventKey + "|" + templateKey + "|" + request.getScenicId() + "|" + request.getMemberId() + "|" + bizId;
return sha256Hex(raw);
}
private static String sha256Hex(String input) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] digest = md.digest(input.getBytes(StandardCharsets.UTF_8));
StringBuilder sb = new StringBuilder(digest.length * 2);
for (byte b : digest) {
sb.append(String.format("%02x", b));
}
return sb.toString();
} catch (Exception e) {
throw new IllegalStateException("sha256计算失败", e);
}
}
private WechatSubscribeSendLogEntity buildInitLog(String idempotencyKey,
String eventKey,
WechatSubscribeTemplateConfigEntity cfg,
WechatSubscribeNotifyTriggerRequest request) {
WechatSubscribeSendLogEntity logEntity = new WechatSubscribeSendLogEntity();
logEntity.setIdempotencyKey(idempotencyKey);
logEntity.setEventKey(eventKey);
logEntity.setTemplateKey(cfg.getTemplateKey());
logEntity.setScenicId(request.getScenicId());
logEntity.setMemberId(request.getMemberId());
logEntity.setOpenId(request.getOpenId());
logEntity.setWechatTemplateId(cfg.getWechatTemplateId());
logEntity.setStatus(STATUS_INIT);
logEntity.setPayloadJson(safeJson(request));
logEntity.setCreateTime(new Date());
logEntity.setUpdateTime(new Date());
return logEntity;
}
private static String safeJson(Object obj) {
try {
return JacksonUtil.toJson(obj);
} catch (Exception e) {
return "{}";
}
}
private static Map<String, Object> buildVariables(String eventKey, WechatSubscribeNotifyTriggerRequest request) {
Map<String, Object> vars = new HashMap<>();
if (request.getVariables() != null) {
vars.putAll(request.getVariables());
}
vars.put("eventKey", eventKey);
vars.put("scenicId", request.getScenicId());
vars.put("memberId", request.getMemberId());
vars.put("openId", request.getOpenId());
vars.put("bizId", request.getBizId());
return vars;
}
private static ZtMessage buildZtMessage(WechatSubscribeTemplateConfigEntity cfg,
String openId,
Map<String, Object> variables,
String eventKey) {
String title = renderOrDefault(cfg.getTitleTemplate(), variables, cfg.getTemplateKey());
String content = renderOrDefault(cfg.getContentTemplate(), variables, title);
String page = renderOrDefault(cfg.getPageTemplate(), variables, "pages/index/index");
Map<String, Object> dataParam = buildDataParam(cfg.getDataTemplateJson(), variables);
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
ZtMessage msg = new ZtMessage();
msg.setChannelId(cfg.getWechatTemplateId());
msg.setTitle(title);
msg.setContent(content);
msg.setTarget(openId);
msg.setExtra(extra);
msg.setSendReason(eventKey);
msg.setSendBiz("订阅消息");
return msg;
}
private static Map<String, Object> buildDataParam(String dataTemplateJson, Map<String, Object> variables) {
if (StringUtils.isBlank(dataTemplateJson)) {
throw new IllegalArgumentException("dataTemplateJson为空");
}
Map<String, Object> templateMap = JacksonUtil.fromJson(dataTemplateJson, new TypeReference<Map<String, Object>>() {});
if (templateMap == null || templateMap.isEmpty()) {
throw new IllegalArgumentException("dataTemplateJson解析为空");
}
Map<String, Object> dataParam = new HashMap<>();
for (Map.Entry<String, Object> entry : templateMap.entrySet()) {
String key = entry.getKey();
if (StringUtils.isBlank(key)) {
continue;
}
String rawValue = entry.getValue() != null ? entry.getValue().toString() : "";
dataParam.put(key, render(rawValue, variables));
}
return dataParam;
}
private static String renderOrDefault(String template, Map<String, Object> variables, String defaultValue) {
if (StringUtils.isBlank(template)) {
return defaultValue;
}
String rendered = render(template, variables);
return StringUtils.isBlank(rendered) ? defaultValue : rendered;
}
private static String render(String template, Map<String, Object> variables) {
if (template == null) {
return null;
}
if (variables == null || variables.isEmpty()) {
return template;
}
Matcher matcher = VAR_PATTERN.matcher(template);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
String key = matcher.group(1);
if (key != null) {
key = key.trim();
}
Object value = key != null ? variables.get(key) : null;
String replacement = value != null ? value.toString() : "";
matcher.appendReplacement(sb, Matcher.quoteReplacement(replacement));
}
matcher.appendTail(sb);
return sb.toString();
}
}