Compare commits

..

2 Commits

Author SHA1 Message Date
311008cbf2 feat(message): 集成ZT消息服务发送通知
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 在TaskTaskServiceImpl中引入ZtMessageProducerService依赖
- 替换原有微信通知逻辑,使用ZT消息服务发送视频生成通知- 在DownloadNotificationTasker中引入ZtMessageProducerService依赖
- 修改视频下载通知发送逻辑,使用ZT消息服务
- 修改视频过期提醒通知逻辑,使用ZT消息服务
- 调整额外通知时间配置获取方式,从scenicConfigManager获取
- 统一构建通知消息参数格式,包含data和page信息
- 添加详细的日志记录,便于追踪消息发送过程
2025-10-14 19:06:30 +08:00
f54d40d026 feat(message):为消息添加唯一标识符支持
- 在 ZtMessage DTO 中新增 messageId 字段
- 发送消息前自动生成 UUID 作为默认 messageId
- 更新 Kafka 生产者日志,包含 messageId 以便追踪
- 增强错误日志记录,附带 messageId 提升调试效率
2025-10-14 18:27:15 +08:00
4 changed files with 97 additions and 69 deletions

View File

@@ -15,6 +15,7 @@ import java.util.Map;
@AllArgsConstructor @AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
public class ZtMessage { public class ZtMessage {
private String messageId; // unique message identifier
private String channelId; // required private String channelId; // required
private String title; // required private String title; // required
private String content; // required private String content; // required

View File

@@ -27,18 +27,24 @@ public class ZtMessageProducerService {
public void send(ZtMessage msg) { public void send(ZtMessage msg) {
validate(msg); validate(msg);
// Generate messageId if not present
if (StringUtils.isBlank(msg.getMessageId())) {
msg.setMessageId(java.util.UUID.randomUUID().toString());
}
String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic()) String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic())
? kafkaProps.getZtMessageTopic() ? kafkaProps.getZtMessageTopic()
: DEFAULT_TOPIC; : DEFAULT_TOPIC;
String key = msg.getChannelId(); String key = msg.getChannelId();
String payload = toJson(msg); String payload = toJson(msg);
log.info("[ZT-MESSAGE] producing to topic={}, key={}, title={}", topic, key, msg.getTitle()); log.info("[ZT-MESSAGE] producing to topic={}, key={}, messageId={}, title={}", topic, key, msg.getMessageId(), msg.getTitle());
kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> { kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> {
if (ex != null) { if (ex != null) {
log.error("[ZT-MESSAGE] produce failed: {}", ex.getMessage(), ex); log.error("[ZT-MESSAGE] produce failed: messageId={}, error={}", msg.getMessageId(), ex.getMessage(), ex);
} else if (metadata != null) { } else if (metadata != null) {
log.info("[ZT-MESSAGE] produced: partition={}, offset={}", metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset()); log.info("[ZT-MESSAGE] produced: messageId={}, partition={}, offset={}", msg.getMessageId(), metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset());
} }
}); });
} }

View File

@@ -5,6 +5,8 @@ import cn.hutool.crypto.digest.MD5;
import com.ycwl.basic.integration.common.manager.DeviceConfigManager; import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
import com.ycwl.basic.integration.common.manager.RenderWorkerConfigManager; import com.ycwl.basic.integration.common.manager.RenderWorkerConfigManager;
import com.ycwl.basic.integration.common.manager.ScenicConfigManager; import com.ycwl.basic.integration.common.manager.ScenicConfigManager;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.repository.MemberRelationRepository; import com.ycwl.basic.repository.MemberRelationRepository;
import com.ycwl.basic.repository.SourceRepository; import com.ycwl.basic.repository.SourceRepository;
import com.ycwl.basic.utils.JacksonUtil; import com.ycwl.basic.utils.JacksonUtil;
@@ -128,6 +130,8 @@ public class TaskTaskServiceImpl implements TaskService {
private SourceRepository sourceRepository; private SourceRepository sourceRepository;
@Autowired @Autowired
private MemberRelationRepository memberRelationRepository; private MemberRelationRepository memberRelationRepository;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) { private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) {
String accessKey = req.getAccessKey(); String accessKey = req.getAccessKey();
@@ -653,23 +657,26 @@ public class TaskTaskServiceImpl implements TaskService {
* 生成时间 {{time4.DATA}} * 生成时间 {{time4.DATA}}
* 备注 {{thing3.DATA}} * 备注 {{thing3.DATA}}
*/ */
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>(); Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>(); dataParam.put("thing1", title);
videoMap.put("value", title); dataParam.put("time4", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm"));
dataParam.put("thing1", videoMap); dataParam.put("thing3", configContent);
Map<String, String> timeMap2 = new HashMap<>();
timeMap2.put("value", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm")); // 构建extra,只包含data和page
dataParam.put("time4", timeMap2); Map<String, Object> extra = new HashMap<>();
Map<String, String> remarkMap = new HashMap<>(); extra.put("data", dataParam);
remarkMap.put("value", configContent); extra.put("page", page);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam); // 使用ZT消息服务发送通知(第一次通知)
params.put("page", page); ZtMessage msg = new ZtMessage();
params.put("template_id", templateId); msg.setChannelId(templateId);
log.info("视频生成通知模板参数:{},用户ID:{}", params, openId); msg.setTitle(title);
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap()); msg.setContent("" + item.getFaceId() + "/" + item.getVideoId() + ""+configContent);
adapter.sendTo(new NotifyContent(title, page, params), openId); msg.setTarget(openId);
msg.setExtra(extra);
msg.setSendReason("视频生成通知");
msg.setSendBiz("视频生成");
ztMessageProducerService.send(msg);
} }
} }

View File

@@ -1,6 +1,8 @@
package com.ycwl.basic.task; package com.ycwl.basic.task;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.integration.scenic.dto.scenic.ScenicV2DTO; import com.ycwl.basic.integration.scenic.dto.scenic.ScenicV2DTO;
import com.ycwl.basic.mapper.CouponMapper; import com.ycwl.basic.mapper.CouponMapper;
import com.ycwl.basic.mapper.MemberMapper; import com.ycwl.basic.mapper.MemberMapper;
@@ -47,6 +49,8 @@ public class DownloadNotificationTasker {
private MemberMapper memberMapper; private MemberMapper memberMapper;
@Autowired @Autowired
private CouponMapper couponMapper; private CouponMapper couponMapper;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
@Scheduled(cron = "0 0 21 * * *") @Scheduled(cron = "0 0 21 * * *")
public void sendDownloadNotification() { public void sendDownloadNotification() {
@@ -64,7 +68,6 @@ public class DownloadNotificationTasker {
log.info("模板消息为空"); log.info("模板消息为空");
return; return;
} }
log.info("发送模板消息");
ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId()); ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId());
ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId()); ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId());
String configTitle = configManager.getString("second_notification_title"); String configTitle = configManager.getString("second_notification_title");
@@ -86,20 +89,25 @@ public class DownloadNotificationTasker {
* 景区 {{thing1.DATA}} * 景区 {{thing1.DATA}}
* 备注 {{thing3.DATA}} * 备注 {{thing3.DATA}}
*/ */
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>(); Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>(); dataParam.put("thing1", title);
videoMap.put("value", title); dataParam.put("thing3", configContent);
dataParam.put("thing1", videoMap);
Map<String, String> remarkMap = new HashMap<>(); // 构建extra,只包含data和page
remarkMap.put("value", configContent); Map<String, Object> extra = new HashMap<>();
dataParam.put("thing3", remarkMap); extra.put("data", dataParam);
params.put("data", dataParam); extra.put("page", page);
params.put("page", page);
params.put("template_id", templateId); // 使用ZT消息服务发送通知(第二次通知)
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId()); ZtMessage msg = new ZtMessage();
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap()); msg.setChannelId(templateId);
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId()); msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("第二次通知");
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
}); });
} }
@@ -124,7 +132,6 @@ public class DownloadNotificationTasker {
log.info("模板消息为空"); log.info("模板消息为空");
return; return;
} }
log.info("发送模板消息");
ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId()); ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId());
ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId()); ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId());
String configTitle = configManager.getString("third_notification_title"); String configTitle = configManager.getString("third_notification_title");
@@ -147,24 +154,27 @@ public class DownloadNotificationTasker {
* 过期时间 {{time2.DATA}} * 过期时间 {{time2.DATA}}
* 备注 {{thing3.DATA}} * 备注 {{thing3.DATA}}
*/ */
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> dateMap = new HashMap<>();
Date expireDate = new Date(item.getCreateTime().getTime() + videoStoreDay * 24 * 60 * 60 * 1000); Date expireDate = new Date(item.getCreateTime().getTime() + videoStoreDay * 24 * 60 * 60 * 1000);
dateMap.put("value", DateUtil.format(expireDate, "yyyy-MM-dd HH:mm")); Map<String, Object> dataParam = new HashMap<>();
dataParam.put("time2", dateMap); dataParam.put("thing1", title);
Map<String, String> remarkMap = new HashMap<>(); dataParam.put("time2", DateUtil.format(expireDate, "yyyy-MM-dd HH:mm"));
remarkMap.put("value", configContent); dataParam.put("thing3", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam); // 构建extra,只包含data和page
params.put("page", page); Map<String, Object> extra = new HashMap<>();
params.put("template_id", templateId); extra.put("data", dataParam);
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId()); extra.put("page", page);
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId()); // 使用ZT消息服务发送通知(第三次通知 - 过期提醒)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("第三次通知");
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
}); });
} }
@@ -183,18 +193,18 @@ public class DownloadNotificationTasker {
calendar.clear(); calendar.clear();
scenicList.parallelStream().forEach(scenic -> { scenicList.parallelStream().forEach(scenic -> {
Long scenicId = Long.parseLong(scenic.getId()); Long scenicId = Long.parseLong(scenic.getId());
ScenicConfigEntity scenicConfig = scenicRepository.getScenicConfig(scenicId); ScenicConfigManager scenicConfig = scenicRepository.getScenicConfigManager(scenicId);
if (scenicConfig == null) { if (scenicConfig == null) {
return; return;
} }
if (StringUtils.isEmpty(scenicConfig.getExtraNotificationTime())) { if (StringUtils.isEmpty(scenicConfig.getString("extra_notification_time"))) {
return; return;
} }
List<String> timeList = Arrays.asList(StringUtils.split(scenicConfig.getExtraNotificationTime(), ",")); List<String> timeList = Arrays.asList(StringUtils.split(scenicConfig.getString("extra_notification_time"), ","));
if (!timeList.contains(String.valueOf(currentHour))) { if (!timeList.contains(String.valueOf(currentHour))) {
return; return;
} }
log.info("当前景区{},配置了{}", scenic.getName(), scenicConfig.getExtraNotificationTime()); log.info("当前景区{},配置了{}", scenic.getName(), scenicConfig.getString("extra_notification_time"));
videoMapper.listRelationByCreateTime(DateUtil.beginOfDay(new Date()), new Date()) videoMapper.listRelationByCreateTime(DateUtil.beginOfDay(new Date()), new Date())
.stream() .stream()
@@ -219,7 +229,6 @@ public class DownloadNotificationTasker {
return; return;
} }
log.info("发送模板消息");
String title = configTitle.replace("【景区】", scenic.getName()); String title = configTitle.replace("【景区】", scenic.getName());
String page; String page;
if (configManager.getBoolean("grouping_enable", false)) { if (configManager.getBoolean("grouping_enable", false)) {
@@ -231,20 +240,25 @@ public class DownloadNotificationTasker {
* 景区 {{thing1.DATA}} * 景区 {{thing1.DATA}}
* 备注 {{thing3.DATA}} * 备注 {{thing3.DATA}}
*/ */
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>(); Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>(); dataParam.put("thing1", title);
videoMap.put("value", title); dataParam.put("thing3", configContent);
dataParam.put("thing1", videoMap);
Map<String, String> remarkMap = new HashMap<>(); // 构建extra,只包含data和page
remarkMap.put("value", configContent); Map<String, Object> extra = new HashMap<>();
dataParam.put("thing3", remarkMap); extra.put("data", dataParam);
params.put("data", dataParam); extra.put("page", page);
params.put("page", page);
params.put("template_id", templateId); // 使用ZT消息服务发送通知(额外下载通知)
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId()); ZtMessage msg = new ZtMessage();
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap()); msg.setChannelId(templateId);
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId()); msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("景区额外配置:" + scenicConfig.getString("extra_notification_time"));
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
}); });
}); });
} }