Compare commits

...

4 Commits

Author SHA1 Message Date
aa7330000f fix(task): 避免重复发送下载和过期通知
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 在发送下载通知前检查用户是否已接收通知
- 在发送过期通知前检查用户是否已接收通知- 在发送额外下载通知前检查用户是否已接收通知
- 使用ConcurrentHashMap.newKeySet()确保线程安全- 添加调试日志以追踪重复通知的跳过情况- 优化通知逻辑以提升定时任务执行效率
2025-10-14 20:31:45 +08:00
29f4bbf2d8 feat(message): 添加ZT消息生产者空实现服务
- 创建 ZtMessageProducerNoOpService 类作为 Kafka 禁用时的替代实现- 实现 ConditionalOnProperty 注解,当 kafka.enabled=false 时激活该服务- 覆写 send 方法,仅记录日志而不实际发送消息
- 添加构造函数以满足父类依赖要求
- 提供详细注释说明服务用途和实现逻辑
2025-10-14 20:28:00 +08:00
ad42254ea0 refactor(task): 移除通知模块依赖
- 删除了对通知模块的包引用
- 移除了通知模块相关的类导入- 清理了与通知功能相关的代码依赖
-优化了任务服务实现类的依赖结构
- 简化了下载通知任务器的代码引用
- 解除了通知工厂类的直接依赖关系
2025-10-14 19:38:47 +08:00
0ceecf0488 fix(message): 将消息相关接口的日志级别从 info 调整为 debug
- 修改消息列表查询接口的日志级别- 修改获取消息通道列表接口的日志级别- 统一调整日志输出方式以减少生产环境日志量
2025-10-14 19:20:41 +08:00
15 changed files with 83 additions and 309 deletions

View File

@@ -32,7 +32,7 @@ public class MessageController {
@RequestParam(required = false) String createdAtStart,
@RequestParam(required = false) String createdAtEnd
) {
log.info("PC|消息列表查询 page={}, pageSize={}, channelId={}, title={}, sendBiz={}", page, pageSize, channelId, title, sendBiz);
log.debug("PC|消息列表查询 page={}, pageSize={}, channelId={}, title={}, sendBiz={}", page, pageSize, channelId, title, sendBiz);
if (pageSize > 100) {
pageSize = 100;
}
@@ -48,7 +48,7 @@ public class MessageController {
@GetMapping("/channels")
public ApiResponse<ChannelsResponse> listChannels() {
log.info("PC|获取消息通道列表");
log.debug("PC|获取消息通道列表");
try {
ChannelsResponse data = messageService.listChannels();
return ApiResponse.success(data);

View File

@@ -0,0 +1,52 @@
package com.ycwl.basic.integration.message.service;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* ZT消息生产者空实现服务
* <p>
* 当 kafka.enabled=false 时,该服务会被激活,作为 ZtMessageProducerService 的替代。
* 所有消息发送操作都会被忽略,只记录日志。
* </p>
*
* @see ZtMessageProducerService
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "false", matchIfMissing = true)
public class ZtMessageProducerNoOpService extends ZtMessageProducerService {
/**
* 空构造函数
* 由于父类需要依赖项,但在此实现中不会使用,因此传入 null
*/
public ZtMessageProducerNoOpService() {
super(null, null, null);
}
/**
* 消息发送的空操作实现
* <p>
* 当 Kafka 未启用时,此方法会被调用。
* 它不会实际发送消息,只会记录一条 debug 日志。
* </p>
*
* @param msg 待发送的消息(会被验证基本字段)
*/
@Override
public void send(ZtMessage msg) {
if (msg == null) {
log.debug("[ZT-MESSAGE] Kafka未启用,跳过消息发送(消息为null)");
return;
}
log.debug("[ZT-MESSAGE] Kafka未启用,跳过消息发送: channelId={}, title={}, target={}, messageId={}",
msg.getChannelId(),
msg.getTitle(),
msg.getTarget(),
msg.getMessageId());
}
}

View File

@@ -1,51 +0,0 @@
package com.ycwl.basic.notify;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.adapters.ServerChanNotifyAdapter;
import com.ycwl.basic.notify.adapters.WxMpSrvNotifyAdapter;
import com.ycwl.basic.notify.enums.NotifyType;
import java.util.HashMap;
import java.util.Map;
public class NotifyFactory {
public static INotifyAdapter get(NotifyType type) {
return switch (type) {
case SERVER_CHAN -> new ServerChanNotifyAdapter();
case WX_MP_SRV -> new WxMpSrvNotifyAdapter();
default -> throw new RuntimeException("不支持的通知类型");
};
}
public static INotifyAdapter get(NotifyType type, Map<String, String> config) {
INotifyAdapter adapter = get(type);
adapter.loadConfig(config);
return adapter;
}
protected static Map<String, INotifyAdapter> namedNotifier = new HashMap<>();
protected static INotifyAdapter defaultNotifier = null;
public static void register(String name, INotifyAdapter adapter) {
namedNotifier.put(name, adapter);
}
public static INotifyAdapter via(String name) {
INotifyAdapter adapter = namedNotifier.get(name);
if (adapter == null) {
throw new RuntimeException("未定义的通知方式:"+name);
}
return adapter;
}
public static INotifyAdapter via() {
if (defaultNotifier == null) {
throw new RuntimeException("未定义默认通知方式");
}
return defaultNotifier;
}
public static void setDefault(String defaultStorage) {
NotifyFactory.defaultNotifier = via(defaultStorage);
}
}

View File

@@ -1,11 +0,0 @@
package com.ycwl.basic.notify.adapters;
import com.ycwl.basic.notify.entity.NotifyContent;
import java.util.Map;
public interface INotifyAdapter {
void loadConfig(Map<String, String> _config);
void sendTo(NotifyContent notifyContent, String to);
}

View File

@@ -1,54 +0,0 @@
package com.ycwl.basic.notify.adapters;
import cn.hutool.http.HttpUtil;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.entity.ServerChanConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ServerChanNotifyAdapter implements INotifyAdapter {
ServerChanConfig config;
@Override
public void loadConfig(Map<String, String> _config) {
ServerChanConfig config = new ServerChanConfig();
config.setKey(_config.get("key"));
config.checkEverythingOK();
this.config = config;
}
@Override
public void sendTo(NotifyContent notifyContent, String to) {
scSend(notifyContent.getTitle(), notifyContent.getContent(), config.getKey());
}
public static String scSend(String title, String content, String key) {
try {
String api;
// 判断 sendkey 是否以 "sctp" 开头,并提取数字部分拼接 URL
if (key.startsWith("sctp")) {
Pattern pattern = Pattern.compile("sctp(\\d+)t");
Matcher matcher = pattern.matcher(key);
if (matcher.find()) {
String num = matcher.group(1);
api = "https://" + num + ".push.ft07.com/send/" + key +".send";
} else {
throw new IllegalArgumentException("Invalid sendkey format for sctp");
}
} else {
api = "https://sctapi.ftqq.com/" + key + ".send";
}
Map<String, Object> body = new HashMap<>();
body.put("title", title);
body.put("desp", content);
return HttpUtil.post(api, body);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}

View File

@@ -1,60 +0,0 @@
package com.ycwl.basic.notify.adapters;
import cn.hutool.http.HttpUtil;
import com.ycwl.basic.utils.JacksonUtil;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.entity.WxMpSrvConfig;
import java.util.Date;
import java.util.Map;
public class WxMpSrvNotifyAdapter implements INotifyAdapter{
private WxMpSrvConfig config;
@Override
public void loadConfig(Map<String, String> _config) {
WxMpSrvConfig config = new WxMpSrvConfig();
config.setAppId(_config.get("appId"));
config.setAppSecret(_config.get("appSecret"));
if (_config.containsKey("state")) {
config.setState(_config.get("state"));
}
config.checkEverythingOK();
this.config = config;
}
@Override
public void sendTo(NotifyContent notifyContent, String openId) {
Map<String, Object> params = notifyContent.getParams();
params.put("touser", openId);
params.put("miniprogram_state", config.getState());
sendServiceNotification(params);
}
private static final String SEND_TEMPLATE_MESSAGE_URL = "https://api.weixin.qq.com/cgi-bin/message/subscribe/send?access_token=%s";
private static final String ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s";
private String ACCESS_TOKEN = "";
private Date expireTime = new Date();
private String getAccessToken() {
if (ACCESS_TOKEN != null && !ACCESS_TOKEN.isEmpty()) {
if (expireTime.getTime() > System.currentTimeMillis()) {
return ACCESS_TOKEN;
}
}
String url = String.format(ACCESS_TOKEN_URL, config.getAppId(), config.getAppSecret());
String response = HttpUtil.get(url);
Map<String, Object> jsonObject = JacksonUtil.parseObject(response, Map.class);
ACCESS_TOKEN = (String) jsonObject.get("access_token");
Integer expiresIn = (Integer) jsonObject.get("expires_in");
expireTime = new Date(System.currentTimeMillis() + (expiresIn != null ? expiresIn : 7200) * 1000);
return ACCESS_TOKEN;
}
public void sendServiceNotification(Map<String, Object> params) {
String url = String.format(SEND_TEMPLATE_MESSAGE_URL, getAccessToken());
String response = HttpUtil.post(url, JacksonUtil.toJSONString(params));
System.out.println(response);
}
}

View File

@@ -1,22 +0,0 @@
package com.ycwl.basic.notify.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NotifyContent {
private String title;
private String content;
private Map<String, Object> params = new HashMap<>();
public NotifyContent(String title, String content) {
this.title = title;
this.content = content;
}
}

View File

@@ -1,11 +0,0 @@
package com.ycwl.basic.notify.entity;
import lombok.Data;
@Data
public class ServerChanConfig {
private String key;
public void checkEverythingOK() {
}
}

View File

@@ -1,15 +0,0 @@
package com.ycwl.basic.notify.entity;
import lombok.Data;
@Data
public class WxMpSrvConfig {
private String appId;
private String appSecret;
private String state = "formal";
private String templateId;
public void checkEverythingOK() {
}
}

View File

@@ -1,15 +0,0 @@
package com.ycwl.basic.notify.enums;
import lombok.Getter;
@Getter
public enum NotifyType {
WX_MP_SRV("WX_MP_SRV"),
SERVER_CHAN("SERVER_CHAN");
private final String type;
NotifyType(String type) {
this.type = type;
}
}

View File

@@ -1,32 +0,0 @@
package com.ycwl.basic.notify.starter;
import com.ycwl.basic.notify.NotifyFactory;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.starter.config.NotifyConfigItem;
import com.ycwl.basic.notify.starter.config.OverallNotifyConfig;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NotifyAutoConfigurator {
private final OverallNotifyConfig config;
public NotifyAutoConfigurator(OverallNotifyConfig config) {
this.config = config;
if (config != null) {
if (config.getConfigs() != null) {
loadConfig();
}
if (StringUtils.isNotBlank(config.getDefaultUse())) {
NotifyFactory.setDefault(config.getDefaultUse());
}
}
}
private void loadConfig() {
for (NotifyConfigItem item : config.getConfigs()) {
INotifyAdapter adapter = NotifyFactory.get(item.getType());
adapter.loadConfig(item.getConfig());
NotifyFactory.register(item.getName(), adapter);
}
}
}

View File

@@ -1,13 +0,0 @@
package com.ycwl.basic.notify.starter.config;
import com.ycwl.basic.notify.enums.NotifyType;
import lombok.Data;
import java.util.Map;
@Data
public class NotifyConfigItem {
private String name;
private NotifyType type;
private Map<String, String> config;
}

View File

@@ -1,15 +0,0 @@
package com.ycwl.basic.notify.starter.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@ConfigurationProperties(prefix = "notify")
@Data
public class OverallNotifyConfig {
private String defaultUse;
private List<NotifyConfigItem> configs;
}

View File

@@ -41,10 +41,6 @@ import com.ycwl.basic.model.task.req.TaskReqVo;
import com.ycwl.basic.model.task.req.TaskSuccessReqVo;
import com.ycwl.basic.model.task.req.WorkerAuthReqVo;
import com.ycwl.basic.model.task.resp.TaskSyncRespVo;
import com.ycwl.basic.notify.NotifyFactory;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.enums.NotifyType;
import com.ycwl.basic.repository.DeviceRepository;
import com.ycwl.basic.repository.FaceRepository;
import com.ycwl.basic.repository.RenderWorkerRepository;

View File

@@ -14,10 +14,6 @@ import com.ycwl.basic.model.pc.mp.MpConfigEntity;
import com.ycwl.basic.model.pc.scenic.entity.ScenicConfigEntity;
import com.ycwl.basic.model.pc.scenic.entity.ScenicEntity;
import com.ycwl.basic.model.pc.scenic.req.ScenicReqQuery;
import com.ycwl.basic.notify.NotifyFactory;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.enums.NotifyType;
import com.ycwl.basic.repository.ScenicRepository;
import com.ycwl.basic.repository.TemplateRepository;
import com.ycwl.basic.integration.common.manager.ScenicConfigManager;
@@ -33,8 +29,11 @@ import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
@EnableScheduling
@@ -55,11 +54,20 @@ public class DownloadNotificationTasker {
@Scheduled(cron = "0 0 21 * * *")
public void sendDownloadNotification() {
log.info("开始执行定时任务");
// 用于记录已发送通知的用户ID,避免重复发送
Set<Long> sentMemberIds = ConcurrentHashMap.newKeySet();
videoMapper.listRelationByCreateTime(new Date(System.currentTimeMillis() - 24 * 60 * 60 * 1000), new Date())
.forEach(item -> {
if (item.getIsBuy() == 1) {
return;
}
// 检查该用户是否已经发送过通知,避免重复发送
if (sentMemberIds.contains(item.getMemberId())) {
log.debug("用户[memberId={}]已发送过下载通知,跳过", item.getMemberId());
return;
}
sentMemberIds.add(item.getMemberId());
MemberRespVO member = memberMapper.getById(item.getMemberId());
MpConfigEntity scenicMp = scenicRepository.getScenicMpConfig(member.getScenicId());
// 发送模板消息
@@ -114,11 +122,20 @@ public class DownloadNotificationTasker {
@Scheduled(cron = "0 0 20 * * *")
public void sendExpireNotification() {
log.info("开始执行定时任务");
// 用于记录已发送通知的用户ID,避免重复发送
Set<Long> sentMemberIds = ConcurrentHashMap.newKeySet();
videoMapper.listRelationByCreateTime(new Date(System.currentTimeMillis() - 2 * 24 * 60 * 60 * 1000), new Date(System.currentTimeMillis() - 24 * 60 * 60 * 1000))
.forEach(item -> {
if (item.getIsBuy() == 1) {
return;
}
// 检查该用户是否已经发送过通知,避免重复发送
if (sentMemberIds.contains(item.getMemberId())) {
log.debug("用户[memberId={}]已发送过过期提醒通知,跳过", item.getMemberId());
return;
}
sentMemberIds.add(item.getMemberId());
MemberRespVO member = memberMapper.getById(item.getMemberId());
MpConfigEntity scenicMp = scenicRepository.getScenicMpConfig(member.getScenicId());
ScenicConfigEntity scenicConfig = scenicRepository.getScenicConfig(item.getScenicId());
@@ -206,12 +223,20 @@ public class DownloadNotificationTasker {
}
log.info("当前景区{},配置了{}", scenic.getName(), scenicConfig.getString("extra_notification_time"));
// 使用线程安全的Set记录已发送通知的用户ID,避免重复发送
Set<Long> sentMemberIds = ConcurrentHashMap.newKeySet();
videoMapper.listRelationByCreateTime(DateUtil.beginOfDay(new Date()), new Date())
.stream()
.filter(item -> item.getIsBuy() == 0)
.filter(item -> item.getScenicId().equals(scenicId))
.parallel()
.forEach(item -> {
// 检查该用户是否已经发送过通知,避免重复发送
if (!sentMemberIds.add(item.getMemberId())) {
log.debug("用户[memberId={}]已发送过额外下载通知,跳过", item.getMemberId());
return;
}
MemberRespVO member = memberMapper.getById(item.getMemberId());
MpConfigEntity scenicMp = scenicRepository.getScenicMpConfig(member.getScenicId());
// 发送模板消息