Compare commits

...

2 Commits

Author SHA1 Message Date
311008cbf2 feat(message): 集成ZT消息服务发送通知
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 在TaskTaskServiceImpl中引入ZtMessageProducerService依赖
- 替换原有微信通知逻辑,使用ZT消息服务发送视频生成通知- 在DownloadNotificationTasker中引入ZtMessageProducerService依赖
- 修改视频下载通知发送逻辑,使用ZT消息服务
- 修改视频过期提醒通知逻辑,使用ZT消息服务
- 调整额外通知时间配置获取方式,从scenicConfigManager获取
- 统一构建通知消息参数格式,包含data和page信息
- 添加详细的日志记录,便于追踪消息发送过程
2025-10-14 19:06:30 +08:00
f54d40d026 feat(message):为消息添加唯一标识符支持
- 在 ZtMessage DTO 中新增 messageId 字段
- 发送消息前自动生成 UUID 作为默认 messageId
- 更新 Kafka 生产者日志,包含 messageId 以便追踪
- 增强错误日志记录,附带 messageId 提升调试效率
2025-10-14 18:27:15 +08:00
4 changed files with 97 additions and 69 deletions

View File

@@ -15,6 +15,7 @@ import java.util.Map;
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ZtMessage {
private String messageId; // unique message identifier
private String channelId; // required
private String title; // required
private String content; // required

View File

@@ -27,18 +27,24 @@ public class ZtMessageProducerService {
public void send(ZtMessage msg) {
validate(msg);
// Generate messageId if not present
if (StringUtils.isBlank(msg.getMessageId())) {
msg.setMessageId(java.util.UUID.randomUUID().toString());
}
String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic())
? kafkaProps.getZtMessageTopic()
: DEFAULT_TOPIC;
String key = msg.getChannelId();
String payload = toJson(msg);
log.info("[ZT-MESSAGE] producing to topic={}, key={}, title={}", topic, key, msg.getTitle());
log.info("[ZT-MESSAGE] producing to topic={}, key={}, messageId={}, title={}", topic, key, msg.getMessageId(), msg.getTitle());
kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> {
if (ex != null) {
log.error("[ZT-MESSAGE] produce failed: {}", ex.getMessage(), ex);
log.error("[ZT-MESSAGE] produce failed: messageId={}, error={}", msg.getMessageId(), ex.getMessage(), ex);
} else if (metadata != null) {
log.info("[ZT-MESSAGE] produced: partition={}, offset={}", metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset());
log.info("[ZT-MESSAGE] produced: messageId={}, partition={}, offset={}", msg.getMessageId(), metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset());
}
});
}

View File

@@ -5,6 +5,8 @@ import cn.hutool.crypto.digest.MD5;
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
import com.ycwl.basic.integration.common.manager.RenderWorkerConfigManager;
import com.ycwl.basic.integration.common.manager.ScenicConfigManager;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.repository.MemberRelationRepository;
import com.ycwl.basic.repository.SourceRepository;
import com.ycwl.basic.utils.JacksonUtil;
@@ -128,6 +130,8 @@ public class TaskTaskServiceImpl implements TaskService {
private SourceRepository sourceRepository;
@Autowired
private MemberRelationRepository memberRelationRepository;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) {
String accessKey = req.getAccessKey();
@@ -653,23 +657,26 @@ public class TaskTaskServiceImpl implements TaskService {
* 生成时间 {{time4.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> timeMap2 = new HashMap<>();
timeMap2.put("value", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm"));
dataParam.put("time4", timeMap2);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频生成通知模板参数:{},用户ID:{}", params, openId);
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), openId);
dataParam.put("thing1", title);
dataParam.put("time4", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm"));
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(第一次通知)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + "/" + item.getVideoId() + ""+configContent);
msg.setTarget(openId);
msg.setExtra(extra);
msg.setSendReason("视频生成通知");
msg.setSendBiz("视频生成");
ztMessageProducerService.send(msg);
}
}

View File

@@ -1,6 +1,8 @@
package com.ycwl.basic.task;
import cn.hutool.core.date.DateUtil;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.integration.scenic.dto.scenic.ScenicV2DTO;
import com.ycwl.basic.mapper.CouponMapper;
import com.ycwl.basic.mapper.MemberMapper;
@@ -47,6 +49,8 @@ public class DownloadNotificationTasker {
private MemberMapper memberMapper;
@Autowired
private CouponMapper couponMapper;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
@Scheduled(cron = "0 0 21 * * *")
public void sendDownloadNotification() {
@@ -64,7 +68,6 @@ public class DownloadNotificationTasker {
log.info("模板消息为空");
return;
}
log.info("发送模板消息");
ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId());
ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId());
String configTitle = configManager.getString("second_notification_title");
@@ -86,20 +89,25 @@ public class DownloadNotificationTasker {
* 景区 {{thing1.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId());
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId());
dataParam.put("thing1", title);
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(第二次通知)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("第二次通知");
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
});
}
@@ -124,7 +132,6 @@ public class DownloadNotificationTasker {
log.info("模板消息为空");
return;
}
log.info("发送模板消息");
ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId());
ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId());
String configTitle = configManager.getString("third_notification_title");
@@ -147,24 +154,27 @@ public class DownloadNotificationTasker {
* 过期时间 {{time2.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> dateMap = new HashMap<>();
Date expireDate = new Date(item.getCreateTime().getTime() + videoStoreDay * 24 * 60 * 60 * 1000);
dateMap.put("value", DateUtil.format(expireDate, "yyyy-MM-dd HH:mm"));
dataParam.put("time2", dateMap);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId());
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId());
Map<String, Object> dataParam = new HashMap<>();
dataParam.put("thing1", title);
dataParam.put("time2", DateUtil.format(expireDate, "yyyy-MM-dd HH:mm"));
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(第三次通知 - 过期提醒)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("第三次通知");
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
});
}
@@ -183,18 +193,18 @@ public class DownloadNotificationTasker {
calendar.clear();
scenicList.parallelStream().forEach(scenic -> {
Long scenicId = Long.parseLong(scenic.getId());
ScenicConfigEntity scenicConfig = scenicRepository.getScenicConfig(scenicId);
ScenicConfigManager scenicConfig = scenicRepository.getScenicConfigManager(scenicId);
if (scenicConfig == null) {
return;
}
if (StringUtils.isEmpty(scenicConfig.getExtraNotificationTime())) {
if (StringUtils.isEmpty(scenicConfig.getString("extra_notification_time"))) {
return;
}
List<String> timeList = Arrays.asList(StringUtils.split(scenicConfig.getExtraNotificationTime(), ","));
List<String> timeList = Arrays.asList(StringUtils.split(scenicConfig.getString("extra_notification_time"), ","));
if (!timeList.contains(String.valueOf(currentHour))) {
return;
}
log.info("当前景区{},配置了{}", scenic.getName(), scenicConfig.getExtraNotificationTime());
log.info("当前景区{},配置了{}", scenic.getName(), scenicConfig.getString("extra_notification_time"));
videoMapper.listRelationByCreateTime(DateUtil.beginOfDay(new Date()), new Date())
.stream()
@@ -219,7 +229,6 @@ public class DownloadNotificationTasker {
return;
}
log.info("发送模板消息");
String title = configTitle.replace("【景区】", scenic.getName());
String page;
if (configManager.getBoolean("grouping_enable", false)) {
@@ -231,20 +240,25 @@ public class DownloadNotificationTasker {
* 景区 {{thing1.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId());
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId());
dataParam.put("thing1", title);
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(额外下载通知)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("景区额外配置:" + scenicConfig.getString("extra_notification_time"));
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
});
});
}