Compare commits

..

15 Commits

Author SHA1 Message Date
a7ede3303d refactor(task): 移除重复的景区配置查询逻辑
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 删除了 DownloadNotificationTasker 中多次调用的 getScenicMpConfig 方法
- 简化了视频下载通知任务的执行流程- 提高代码可读性和维护性
- 避免不必要的数据库查询操作
2025-10-14 20:32:36 +08:00
aa7330000f fix(task): 避免重复发送下载和过期通知
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 在发送下载通知前检查用户是否已接收通知
- 在发送过期通知前检查用户是否已接收通知- 在发送额外下载通知前检查用户是否已接收通知
- 使用ConcurrentHashMap.newKeySet()确保线程安全- 添加调试日志以追踪重复通知的跳过情况- 优化通知逻辑以提升定时任务执行效率
2025-10-14 20:31:45 +08:00
29f4bbf2d8 feat(message): 添加ZT消息生产者空实现服务
- 创建 ZtMessageProducerNoOpService 类作为 Kafka 禁用时的替代实现- 实现 ConditionalOnProperty 注解,当 kafka.enabled=false 时激活该服务- 覆写 send 方法,仅记录日志而不实际发送消息
- 添加构造函数以满足父类依赖要求
- 提供详细注释说明服务用途和实现逻辑
2025-10-14 20:28:00 +08:00
ad42254ea0 refactor(task): 移除通知模块依赖
- 删除了对通知模块的包引用
- 移除了通知模块相关的类导入- 清理了与通知功能相关的代码依赖
-优化了任务服务实现类的依赖结构
- 简化了下载通知任务器的代码引用
- 解除了通知工厂类的直接依赖关系
2025-10-14 19:38:47 +08:00
0ceecf0488 fix(message): 将消息相关接口的日志级别从 info 调整为 debug
- 修改消息列表查询接口的日志级别- 修改获取消息通道列表接口的日志级别- 统一调整日志输出方式以减少生产环境日志量
2025-10-14 19:20:41 +08:00
311008cbf2 feat(message): 集成ZT消息服务发送通知
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 在TaskTaskServiceImpl中引入ZtMessageProducerService依赖
- 替换原有微信通知逻辑,使用ZT消息服务发送视频生成通知- 在DownloadNotificationTasker中引入ZtMessageProducerService依赖
- 修改视频下载通知发送逻辑,使用ZT消息服务
- 修改视频过期提醒通知逻辑,使用ZT消息服务
- 调整额外通知时间配置获取方式,从scenicConfigManager获取
- 统一构建通知消息参数格式,包含data和page信息
- 添加详细的日志记录,便于追踪消息发送过程
2025-10-14 19:06:30 +08:00
f54d40d026 feat(message):为消息添加唯一标识符支持
- 在 ZtMessage DTO 中新增 messageId 字段
- 发送消息前自动生成 UUID 作为默认 messageId
- 更新 Kafka 生产者日志,包含 messageId 以便追踪
- 增强错误日志记录,附带 messageId 提升调试效率
2025-10-14 18:27:15 +08:00
3cb12c13c2 feat(printer):优化用户照片添加逻辑并返回结果ID
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 修改 addUserPhoto 方法参数,使用 MemberPrintEntity 实体传参- 在 PrinterMapper.xml 中配置 insert 语句返回主键 ID- 更新 addUserPhotoFromSource 方法返回值为 List<Integer>
- 添加异常处理和日志记录
- 调整 AppPrinterController 接口返回照片 ID 列表
2025-10-14 11:45:46 +08:00
feac2e8d93 refactor(config): 移除ScenicConfigManager中的冗余代码
- 删除了未使用的configMap字段- 移除了基于Map的构造函数- 清理了所有与configMap相关的getter方法
- 移除了hasKey和hasNonNullValue方法
- 删除了获取所有配置键和配置数量的方法
- 移除了配置子集和扁平化配置相关功能
- 简化了toString方法的实现
2025-10-12 01:09:54 +08:00
be375067ce feat(message): 移除ZT消息生产者示例代码- 删除ZtMessageProducerExample类及相关依赖
- 移除示例消息发送逻辑
- 清理无用的HashMap和日志记录代码
- 移除条件注解@ConditionalOnProperty配置
- 删除消息构建及发送示例实现
2025-10-11 20:34:00 +08:00
7dec2e614c feat(watchdog): 增强任务监控告警机制
- 引入ZtMessageProducerService实现消息通知
- 添加任务积压、失败任务和长时间运行任务的分类监控
- 实现异常通知计数器,限制重复告警次数
-优化告警逻辑,支持异常恢复后计数器重置
- 移除旧的通知工厂依赖,统一使用消息队列发送
- 增加长时任务监控的清理机制,避免无效计数累积
2025-10-11 20:33:49 +08:00
51d0716606 Merge branch 'message-microservice'
# Conflicts:
#	src/main/java/com/ycwl/basic/integration/CLAUDE.md
2025-10-11 15:07:52 +08:00
765998bd97 docs(integration): 移除示例代码并更新配置说明- 删除设备集成测试中的默认配置启用示例
- 移除了消息集成组件中的示例引用
- 更新ZT-Message集成概述,去除对旧文档的引用
- 简化目录结构展示,移除example模块
- 清理冗余的配置键值说明- 统一删除各模块下的example目录引用
- 优化文档结构,提高可读性
2025-10-11 11:24:42 +08:00
b14754ec0a feat(integration): 添加消息服务相关接口和功能
- 新增 MessageController 类,实现消息列表查询和消息通道列表获取功能
- 新增 MessageClient 接口,用于调用消息服务的 Feign客户端
- 新增 ChannelsResponse、MessageListData 和 MessageRecordDTO 数据传输对象
- 新增 MessageIntegrationService 服务类,处理消息服务相关业务逻辑
2025-09-17 21:53:41 +08:00
a888ed3fe2 feat(integration): 添加 ZT-Message Kafka 生产者集成
- 新增 ZtMessage DTO 类用于消息体
- 实现 ZtMessageProducerService 生产者服务
- 添加示例演示如何发送消息
- 更新配置文件和文档以支持新功能
2025-09-17 21:38:26 +08:00
32 changed files with 703 additions and 716 deletions

View File

@@ -43,6 +43,12 @@ public class KafkaConfig {
@Value("${kafka.producer.buffer-memory:33554432}")
private Integer bufferMemory;
@Value("${kafka.producer.enable-idempotence:true}")
private boolean enableIdempotence;
@Value("${kafka.producer.compression-type:snappy}")
private String compressionType;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
@@ -54,6 +60,8 @@ public class KafkaConfig {
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
return new DefaultKafkaProducerFactory<>(configProps);
}

View File

@@ -75,8 +75,8 @@ public class AppPrinterController {
}
@PostMapping("/uploadTo/{scenicId}/formSource")
public ApiResponse<?> uploadFromSource(@PathVariable("scenicId") Long scenicId, @RequestBody FromSourceReq req) throws IOException {
printerService.addUserPhotoFromSource(JwtTokenUtil.getWorker().getUserId(), scenicId, req);
return ApiResponse.success(null);
List<Integer> list = printerService.addUserPhotoFromSource(JwtTokenUtil.getWorker().getUserId(), scenicId, req);
return ApiResponse.success(list);
}
@PostMapping("/setQuantity/{scenicId}/{id}")

View File

@@ -0,0 +1,60 @@
package com.ycwl.basic.controller.pc;
import com.ycwl.basic.integration.message.dto.ChannelsResponse;
import com.ycwl.basic.integration.message.dto.MessageListData;
import com.ycwl.basic.integration.message.service.MessageIntegrationService;
import com.ycwl.basic.utils.ApiResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/api/message/v1")
@RequiredArgsConstructor
public class MessageController {
private final MessageIntegrationService messageService;
@GetMapping("/messages")
public ApiResponse<MessageListData> listMessages(
@RequestParam(defaultValue = "1") Integer page,
@RequestParam(defaultValue = "20") Integer pageSize,
@RequestParam(required = false) String channelId,
@RequestParam(required = false) String title,
@RequestParam(required = false) String content,
@RequestParam(required = false) String sendBiz,
@RequestParam(required = false) String sentAtStart,
@RequestParam(required = false) String sentAtEnd,
@RequestParam(required = false) String createdAtStart,
@RequestParam(required = false) String createdAtEnd
) {
log.debug("PC|消息列表查询 page={}, pageSize={}, channelId={}, title={}, sendBiz={}", page, pageSize, channelId, title, sendBiz);
if (pageSize > 100) {
pageSize = 100;
}
try {
MessageListData data = messageService.listMessages(page, pageSize, channelId, title, content, sendBiz,
sentAtStart, sentAtEnd, createdAtStart, createdAtEnd);
return ApiResponse.success(data);
} catch (Exception e) {
log.error("PC|消息列表查询失败", e);
return ApiResponse.fail("消息列表查询失败: " + e.getMessage());
}
}
@GetMapping("/channels")
public ApiResponse<ChannelsResponse> listChannels() {
log.debug("PC|获取消息通道列表");
try {
ChannelsResponse data = messageService.listChannels();
return ApiResponse.success(data);
} catch (Exception e) {
log.error("PC|获取消息通道列表失败", e);
return ApiResponse.fail("获取消息通道列表失败: " + e.getMessage());
}
}
}

View File

@@ -25,6 +25,7 @@ Currently implemented:
- **Device Integration** (`com.ycwl.basic.integration.device`): ZT-Device microservice integration
- **Render Worker Integration** (`com.ycwl.basic.integration.render`): ZT-Render-Worker microservice integration
- **Questionnaire Integration** (`com.ycwl.basic.integration.questionnaire`): ZT-Questionnaire microservice integration
- **Message Integration** (`com.ycwl.basic.integration.message`): ZT-Message Kafka producer integration
### Integration Pattern
@@ -34,8 +35,7 @@ service/
├── client/ # Feign clients for HTTP calls
├── config/ # Service-specific configuration
├── dto/ # Data transfer objects
── service/ # Service layer with business logic
└── example/ # Usage examples
── service/ # Service layer with business logic
```
## Integration Fallback Mechanism
@@ -792,13 +792,6 @@ mvn test -Dtest=DefaultConfigIntegrationServiceTest
# Run all device integration tests (including default configs)
mvn test -Dtest="com.ycwl.basic.integration.device.*Test"
# Enable example runner in application-dev.yml
integration:
device:
example:
default-config:
enabled: true
```
### Common Configuration Keys
@@ -820,8 +813,7 @@ com.ycwl.basic.integration.{service-name}/
├── client/
├── config/
├── dto/
── service/
└── example/
── service/
```
### 2. Add Configuration Properties
@@ -1168,6 +1160,57 @@ fallbackService.clearAllFallbackCache("zt-render-worker");
- **Active (isActive=1)**: Worker is available for tasks
- **Inactive (isActive=0)**: Worker is disabled
## ZT-Message Integration (Kafka Producer)
### Overview
The zt-message microservice accepts messages via Kafka on topic `zt-message`. This integration provides a simple producer service to publish notification messages.
- Topic: `zt-message`
- Key: Use `channelId` for partitioning stability
- Value: UTF-8 JSON with fields: `channelId` (required), `title` (required), `content` (required), `target` (required), `extra` (object, optional), `sendReason` (optional), `sendBiz` (optional)
### Components
- `com.ycwl.basic.integration.message.dto.ZtMessage`: DTO for message body
- `com.ycwl.basic.integration.message.service.ZtMessageProducerService`: Producer service using Spring Kafka
### Configuration
```yaml
kafka:
enabled: true # enable Kafka integration
bootstrap-servers: 127.0.0.1:9092 # adjust per environment
zt-message-topic: zt-message # topic name (default already zt-message)
producer:
acks: all
enable-idempotence: true
retries: 5
linger-ms: 10
batch-size: 32768
compression-type: snappy
```
### Usage
```java
@Autowired
private ZtMessageProducerService producer;
public void sendWelcome() {
ZtMessage msg = ZtMessage.of("dummy", "欢迎", "注册成功", "user-001");
Map<String, Object> extra = new HashMap<>();
extra.put("k", "v");
msg.setExtra(extra);
msg.setSendReason("REGISTER");
msg.setSendBiz("USER");
producer.send(msg); // key uses channelId, value is JSON
}
```
### Notes
- Required fields must be non-empty: `channelId`, `title`, `content`, `target`
- Keep message body small (< 100 KB)
- Use string for 64-bit integers in `extra` to avoid JS precision loss
- Service logs the partition/offset upon success, errors on failure
## Common Development Tasks
### Running Integration Tests

View File

@@ -16,8 +16,6 @@ import java.util.stream.Collectors;
*/
public class ScenicConfigManager extends ConfigManager<ScenicConfigV2DTO> {
private final Map<String, Object> configMap;
/**
* 从配置列表构造管理器
*
@@ -25,26 +23,7 @@ public class ScenicConfigManager extends ConfigManager<ScenicConfigV2DTO> {
*/
public ScenicConfigManager(List<ScenicConfigV2DTO> configList) {
super(configList);
this.configMap = new HashMap<>();
if (configList != null) {
for (ScenicConfigV2DTO config : configList) {
if (config.getConfigKey() != null && config.getConfigValue() != null) {
this.configMap.put(config.getConfigKey(), config.getConfigValue());
}
}
}
}
/**
* 从配置Map构造管理器
*
* @param configMap 配置Map
*/
public ScenicConfigManager(Map<String, Object> configMap) {
super(null); // 使用Map构造时,父类configs为null
this.configMap = configMap != null ? new HashMap<>(configMap) : new HashMap<>();
}
@Override
protected String getConfigKey(ScenicConfigV2DTO config) {
return config != null ? config.getConfigKey() : null;
@@ -54,277 +33,5 @@ public class ScenicConfigManager extends ConfigManager<ScenicConfigV2DTO> {
protected Object getConfigValue(ScenicConfigV2DTO config) {
return config != null ? config.getConfigValue() : null;
}
/**
* 获取长整数值
*
* @param key 配置键
* @return Long值,如果键不存在或转换失败返回null
*/
public Long getLong(String key) {
return ConfigValueUtil.getLongValue(configMap, key);
}
/**
* 获取长整数值,如果为null则返回默认值
*
* @param key 配置键
* @param defaultValue 默认值
* @return Long值或默认值
*/
public Long getLong(String key, Long defaultValue) {
Long value = ConfigValueUtil.getLongValue(configMap, key);
return value != null ? value : defaultValue;
}
/**
* 获取浮点数值
*
* @param key 配置键
* @return Float值,如果键不存在或转换失败返回null
*/
public Float getFloat(String key) {
return ConfigValueUtil.getFloatValue(configMap, key);
}
/**
* 获取浮点数值,如果为null则返回默认值
*
* @param key 配置键
* @param defaultValue 默认值
* @return Float值或默认值
*/
public Float getFloat(String key, Float defaultValue) {
Float value = ConfigValueUtil.getFloatValue(configMap, key);
return value != null ? value : defaultValue;
}
/**
* 获取双精度浮点数值
*
* @param key 配置键
* @return Double值,如果键不存在或转换失败返回null
*/
public Double getDouble(String key) {
return ConfigValueUtil.getDoubleValue(configMap, key);
}
/**
* 获取双精度浮点数值,如果为null则返回默认值
*
* @param key 配置键
* @param defaultValue 默认值
* @return Double值或默认值
*/
public Double getDouble(String key, Double defaultValue) {
Double value = ConfigValueUtil.getDoubleValue(configMap, key);
return value != null ? value : defaultValue;
}
/**
* 获取高精度小数值
*
* @param key 配置键
* @return BigDecimal值,如果键不存在或转换失败返回null
*/
public BigDecimal getBigDecimal(String key) {
return ConfigValueUtil.getBigDecimalValue(configMap, key);
}
/**
* 获取高精度小数值,如果为null则返回默认值
*
* @param key 配置键
* @param defaultValue 默认值
* @return BigDecimal值或默认值
*/
public BigDecimal getBigDecimal(String key, BigDecimal defaultValue) {
BigDecimal value = ConfigValueUtil.getBigDecimalValue(configMap, key);
return value != null ? value : defaultValue;
}
/**
* 获取布尔值
*
* @param key 配置键
* @return Boolean值,如果键不存在或转换失败返回null
*/
public Boolean getBoolean(String key) {
return ConfigValueUtil.getBooleanValue(configMap, key);
}
/**
* 获取布尔值,如果为null则返回默认值
*
* @param key 配置键
* @param defaultValue 默认值
* @return Boolean值或默认值
*/
public Boolean getBoolean(String key, Boolean defaultValue) {
return ConfigValueUtil.getBooleanValue(configMap, key, defaultValue);
}
/**
* 检查配置键是否存在
*
* @param key 配置键
* @return true如果键存在,false如果不存在
*/
public boolean hasKey(String key) {
return ConfigValueUtil.hasKey(configMap, key);
}
/**
* 检查配置键是否存在且值不为null
*
* @param key 配置键
* @return true如果键存在且值不为null
*/
public boolean hasNonNullValue(String key) {
return ConfigValueUtil.hasNonNullValue(configMap, key);
}
/**
* 获取所有配置键
*
* @return 配置键集合
*/
public Set<String> getAllKeys() {
return new HashSet<>(configMap.keySet());
}
/**
* 获取配置项数量
*
* @return 配置项数量
*/
@Override
public int size() {
return configMap.size();
}
/**
* 检查配置是否为空
*
* @return true如果没有配置项
*/
public boolean isEmpty() {
return configMap.isEmpty();
}
/**
* 获取所有配置的拷贝
*
* @return 配置Map的拷贝
*/
public Map<String, Object> getAllConfigsAsMap() {
return new HashMap<>(configMap);
}
/**
* 根据键前缀过滤配置
*
* @param prefix 键前缀
* @return 匹配前缀的配置Map
*/
public Map<String, Object> getConfigsByPrefix(String prefix) {
if (prefix == null) {
return new HashMap<>();
}
return configMap.entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getKey().startsWith(prefix))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
}
/**
* 创建新的ScenicConfigManager,包含当前配置的子集
*
* @param keys 要包含的配置键
* @return 包含指定键配置的新管理器
*/
public ScenicConfigManager subset(Set<String> keys) {
Map<String, Object> subsetMap = new HashMap<>();
if (keys != null) {
for (String key : keys) {
if (configMap.containsKey(key)) {
subsetMap.put(key, configMap.get(key));
}
}
}
return new ScenicConfigManager(subsetMap);
}
/**
* 将配置转换为扁平化的Map,键名转换为驼峰形式
*
* @return 扁平化的配置Map,键为驼峰形式
*/
public Map<String, Object> toFlatConfig() {
Map<String, Object> flatConfig = new HashMap<>();
for (Map.Entry<String, Object> entry : configMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (key != null) {
String camelCaseKey = toCamelCase(key);
flatConfig.put(camelCaseKey, value);
}
}
return flatConfig;
}
/**
* 将字符串转换为驼峰形式
* 支持下划线、短横线、点号分隔的字符串转换
*
* @param str 原始字符串
* @return 驼峰形式的字符串
*/
private String toCamelCase(String str) {
if (str == null || str.isEmpty()) {
return str;
}
// 支持下划线、短横线、点号作为分隔符
String[] parts = str.split("[_\\-.]");
if (parts.length <= 1) {
return str;
}
StringBuilder camelCase = new StringBuilder();
// 第一部分保持原样(全小写)
camelCase.append(parts[0].toLowerCase());
// 后续部分首字母大写
for (int i = 1; i < parts.length; i++) {
String part = parts[i];
if (!part.isEmpty()) {
camelCase.append(Character.toUpperCase(part.charAt(0)));
if (part.length() > 1) {
camelCase.append(part.substring(1).toLowerCase());
}
}
}
return camelCase.toString();
}
@Override
public String toString() {
return "ScenicConfigManager{" +
"configCount=" + configMap.size() +
", keys=" + configMap.keySet() +
'}';
}
}

View File

@@ -11,6 +11,7 @@ public class KafkaIntegrationProperties {
private boolean enabled = false;
private String bootstrapServers = "100.64.0.12:39092";
private String ztMessageTopic = "zt-message"; // topic for zt-message microservice
private Consumer consumer = new Consumer();
private Producer producer = new Producer();

View File

@@ -0,0 +1,29 @@
package com.ycwl.basic.integration.message.client;
import com.ycwl.basic.integration.common.response.CommonResponse;
import com.ycwl.basic.integration.message.dto.ChannelsResponse;
import com.ycwl.basic.integration.message.dto.MessageListData;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "zt-message", contextId = "zt-message", path = "")
public interface MessageClient {
@GetMapping("/messages")
CommonResponse<MessageListData> listMessages(
@RequestParam(name = "page", defaultValue = "1") Integer page,
@RequestParam(name = "pageSize", defaultValue = "20") Integer pageSize,
@RequestParam(name = "channelId", required = false) String channelId,
@RequestParam(name = "title", required = false) String title,
@RequestParam(name = "content", required = false) String content,
@RequestParam(name = "sendBiz", required = false) String sendBiz,
@RequestParam(name = "sentAtStart", required = false) String sentAtStart,
@RequestParam(name = "sentAtEnd", required = false) String sentAtEnd,
@RequestParam(name = "createdAtStart", required = false) String createdAtStart,
@RequestParam(name = "createdAtEnd", required = false) String createdAtEnd
);
@GetMapping("/channels")
CommonResponse<ChannelsResponse> listChannels();
}

View File

@@ -0,0 +1,10 @@
package com.ycwl.basic.integration.message.dto;
import lombok.Data;
import java.util.List;
@Data
public class ChannelsResponse {
private List<String> channels;
}

View File

@@ -0,0 +1,13 @@
package com.ycwl.basic.integration.message.dto;
import lombok.Data;
import java.util.List;
@Data
public class MessageListData {
private List<MessageRecordDTO> list;
private String total; // string to avoid JS precision
private Integer page;
private Integer pageSize;
}

View File

@@ -0,0 +1,25 @@
package com.ycwl.basic.integration.message.dto;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import java.util.Map;
@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class MessageRecordDTO {
private String id; // string to avoid JS precision
private String channelId;
private String title;
private String content;
private String target;
private Map<String, Object> extraJson;
private String sendReason;
private String sendBiz;
private String status;
private String errorMsg;
private Integer attempts;
private String sentAt; // RFC3339 or yyyy-MM-dd HH:mm:ss (pass-through)
private String createdAt;
private String updatedAt;
}

View File

@@ -0,0 +1,36 @@
package com.ycwl.basic.integration.message.dto;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ZtMessage {
private String messageId; // unique message identifier
private String channelId; // required
private String title; // required
private String content; // required
private String target; // required
private Map<String, Object> extra; // optional
private String sendReason; // optional
private String sendBiz; // optional
public static ZtMessage of(String channelId, String title, String content, String target) {
return ZtMessage.builder()
.channelId(channelId)
.title(title)
.content(content)
.target(target)
.extra(new HashMap<>())
.build();
}
}

View File

@@ -0,0 +1,57 @@
package com.ycwl.basic.integration.message.service;
import com.ycwl.basic.integration.common.exception.IntegrationException;
import com.ycwl.basic.integration.common.response.CommonResponse;
import com.ycwl.basic.integration.common.service.IntegrationFallbackService;
import com.ycwl.basic.integration.message.client.MessageClient;
import com.ycwl.basic.integration.message.dto.ChannelsResponse;
import com.ycwl.basic.integration.message.dto.MessageListData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageIntegrationService {
private final MessageClient client;
private final IntegrationFallbackService fallbackService;
private static final String SERVICE_NAME = "zt-message";
public MessageListData listMessages(Integer page, Integer pageSize,
String channelId, String title, String content, String sendBiz,
String sentAtStart, String sentAtEnd,
String createdAtStart, String createdAtEnd) {
log.debug("查询消息列表 page={}, pageSize={}, channelId={}, title={}, sendBiz={}", page, pageSize, channelId, title, sendBiz);
CommonResponse<MessageListData> resp = client.listMessages(page, pageSize, channelId, title, content, sendBiz,
sentAtStart, sentAtEnd, createdAtStart, createdAtEnd);
return handleResponse(resp, "查询消息列表失败");
}
public ChannelsResponse listChannels() {
log.debug("查询消息通道列表");
// 相对稳定的数据,使用fallback缓存
return fallbackService.executeWithFallback(
SERVICE_NAME,
"channels",
() -> {
CommonResponse<ChannelsResponse> resp = client.listChannels();
return handleResponse(resp, "查询通道列表失败");
},
ChannelsResponse.class
);
}
private <T> T handleResponse(CommonResponse<T> response, String errorMessage) {
if (response == null || !response.isSuccess()) {
String msg = response != null && response.getMessage() != null
? response.getMessage()
: errorMessage;
Integer code = response != null ? response.getCode() : 5000;
throw new IntegrationException(code, msg, SERVICE_NAME);
}
return response.getData();
}
}

View File

@@ -0,0 +1,52 @@
package com.ycwl.basic.integration.message.service;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* ZT消息生产者空实现服务
* <p>
* 当 kafka.enabled=false 时,该服务会被激活,作为 ZtMessageProducerService 的替代。
* 所有消息发送操作都会被忽略,只记录日志。
* </p>
*
* @see ZtMessageProducerService
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "false", matchIfMissing = true)
public class ZtMessageProducerNoOpService extends ZtMessageProducerService {
/**
* 空构造函数
* 由于父类需要依赖项,但在此实现中不会使用,因此传入 null
*/
public ZtMessageProducerNoOpService() {
super(null, null, null);
}
/**
* 消息发送的空操作实现
* <p>
* 当 Kafka 未启用时,此方法会被调用。
* 它不会实际发送消息,只会记录一条 debug 日志。
* </p>
*
* @param msg 待发送的消息(会被验证基本字段)
*/
@Override
public void send(ZtMessage msg) {
if (msg == null) {
log.debug("[ZT-MESSAGE] Kafka未启用,跳过消息发送(消息为null)");
return;
}
log.debug("[ZT-MESSAGE] Kafka未启用,跳过消息发送: channelId={}, title={}, target={}, messageId={}",
msg.getChannelId(),
msg.getTitle(),
msg.getTarget(),
msg.getMessageId());
}
}

View File

@@ -0,0 +1,70 @@
package com.ycwl.basic.integration.message.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class ZtMessageProducerService {
public static final String DEFAULT_TOPIC = "zt-message";
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final KafkaIntegrationProperties kafkaProps;
public void send(ZtMessage msg) {
validate(msg);
// Generate messageId if not present
if (StringUtils.isBlank(msg.getMessageId())) {
msg.setMessageId(java.util.UUID.randomUUID().toString());
}
String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic())
? kafkaProps.getZtMessageTopic()
: DEFAULT_TOPIC;
String key = msg.getChannelId();
String payload = toJson(msg);
log.info("[ZT-MESSAGE] producing to topic={}, key={}, messageId={}, title={}", topic, key, msg.getMessageId(), msg.getTitle());
kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> {
if (ex != null) {
log.error("[ZT-MESSAGE] produce failed: messageId={}, error={}", msg.getMessageId(), ex.getMessage(), ex);
} else if (metadata != null) {
log.info("[ZT-MESSAGE] produced: messageId={}, partition={}, offset={}", msg.getMessageId(), metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset());
}
});
}
private void validate(ZtMessage msg) {
if (msg == null) throw new IllegalArgumentException("message is null");
if (StringUtils.isBlank(msg.getChannelId())) throw new IllegalArgumentException("channelId is required");
if (StringUtils.isBlank(msg.getTitle())) throw new IllegalArgumentException("title is required");
if (StringUtils.isBlank(msg.getContent())) throw new IllegalArgumentException("content is required");
if (StringUtils.isBlank(msg.getTarget())) throw new IllegalArgumentException("target is required");
if (msg.getExtra() != null && !(msg.getExtra() instanceof Map)) {
throw new IllegalArgumentException("extra must be a Map");
}
}
private String toJson(ZtMessage msg) {
try {
return objectMapper.writeValueAsString(msg);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("failed to serialize message", e);
}
}
}

View File

@@ -1,5 +1,6 @@
package com.ycwl.basic.mapper;
import com.ycwl.basic.model.pc.printer.entity.MemberPrintEntity;
import com.ycwl.basic.model.pc.printer.entity.PrintTaskEntity;
import com.ycwl.basic.model.pc.printer.entity.PrinterEntity;
import com.ycwl.basic.model.pc.printer.resp.MemberPrintResp;
@@ -35,7 +36,7 @@ public interface PrinterMapper {
int deleteUserPhoto(Long memberId, Long scenicId, Long relationId);
int addUserPhoto(Long memberId, Long scenicId, String url);
int addUserPhoto(MemberPrintEntity entity);
MemberPrintResp getUserPhoto(Long memberId, Long scenicId, Long id);

View File

@@ -1,51 +0,0 @@
package com.ycwl.basic.notify;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.adapters.ServerChanNotifyAdapter;
import com.ycwl.basic.notify.adapters.WxMpSrvNotifyAdapter;
import com.ycwl.basic.notify.enums.NotifyType;
import java.util.HashMap;
import java.util.Map;
public class NotifyFactory {
public static INotifyAdapter get(NotifyType type) {
return switch (type) {
case SERVER_CHAN -> new ServerChanNotifyAdapter();
case WX_MP_SRV -> new WxMpSrvNotifyAdapter();
default -> throw new RuntimeException("不支持的通知类型");
};
}
public static INotifyAdapter get(NotifyType type, Map<String, String> config) {
INotifyAdapter adapter = get(type);
adapter.loadConfig(config);
return adapter;
}
protected static Map<String, INotifyAdapter> namedNotifier = new HashMap<>();
protected static INotifyAdapter defaultNotifier = null;
public static void register(String name, INotifyAdapter adapter) {
namedNotifier.put(name, adapter);
}
public static INotifyAdapter via(String name) {
INotifyAdapter adapter = namedNotifier.get(name);
if (adapter == null) {
throw new RuntimeException("未定义的通知方式:"+name);
}
return adapter;
}
public static INotifyAdapter via() {
if (defaultNotifier == null) {
throw new RuntimeException("未定义默认通知方式");
}
return defaultNotifier;
}
public static void setDefault(String defaultStorage) {
NotifyFactory.defaultNotifier = via(defaultStorage);
}
}

View File

@@ -1,11 +0,0 @@
package com.ycwl.basic.notify.adapters;
import com.ycwl.basic.notify.entity.NotifyContent;
import java.util.Map;
public interface INotifyAdapter {
void loadConfig(Map<String, String> _config);
void sendTo(NotifyContent notifyContent, String to);
}

View File

@@ -1,54 +0,0 @@
package com.ycwl.basic.notify.adapters;
import cn.hutool.http.HttpUtil;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.entity.ServerChanConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ServerChanNotifyAdapter implements INotifyAdapter {
ServerChanConfig config;
@Override
public void loadConfig(Map<String, String> _config) {
ServerChanConfig config = new ServerChanConfig();
config.setKey(_config.get("key"));
config.checkEverythingOK();
this.config = config;
}
@Override
public void sendTo(NotifyContent notifyContent, String to) {
scSend(notifyContent.getTitle(), notifyContent.getContent(), config.getKey());
}
public static String scSend(String title, String content, String key) {
try {
String api;
// 判断 sendkey 是否以 "sctp" 开头,并提取数字部分拼接 URL
if (key.startsWith("sctp")) {
Pattern pattern = Pattern.compile("sctp(\\d+)t");
Matcher matcher = pattern.matcher(key);
if (matcher.find()) {
String num = matcher.group(1);
api = "https://" + num + ".push.ft07.com/send/" + key +".send";
} else {
throw new IllegalArgumentException("Invalid sendkey format for sctp");
}
} else {
api = "https://sctapi.ftqq.com/" + key + ".send";
}
Map<String, Object> body = new HashMap<>();
body.put("title", title);
body.put("desp", content);
return HttpUtil.post(api, body);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}

View File

@@ -1,60 +0,0 @@
package com.ycwl.basic.notify.adapters;
import cn.hutool.http.HttpUtil;
import com.ycwl.basic.utils.JacksonUtil;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.entity.WxMpSrvConfig;
import java.util.Date;
import java.util.Map;
public class WxMpSrvNotifyAdapter implements INotifyAdapter{
private WxMpSrvConfig config;
@Override
public void loadConfig(Map<String, String> _config) {
WxMpSrvConfig config = new WxMpSrvConfig();
config.setAppId(_config.get("appId"));
config.setAppSecret(_config.get("appSecret"));
if (_config.containsKey("state")) {
config.setState(_config.get("state"));
}
config.checkEverythingOK();
this.config = config;
}
@Override
public void sendTo(NotifyContent notifyContent, String openId) {
Map<String, Object> params = notifyContent.getParams();
params.put("touser", openId);
params.put("miniprogram_state", config.getState());
sendServiceNotification(params);
}
private static final String SEND_TEMPLATE_MESSAGE_URL = "https://api.weixin.qq.com/cgi-bin/message/subscribe/send?access_token=%s";
private static final String ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=%s&secret=%s";
private String ACCESS_TOKEN = "";
private Date expireTime = new Date();
private String getAccessToken() {
if (ACCESS_TOKEN != null && !ACCESS_TOKEN.isEmpty()) {
if (expireTime.getTime() > System.currentTimeMillis()) {
return ACCESS_TOKEN;
}
}
String url = String.format(ACCESS_TOKEN_URL, config.getAppId(), config.getAppSecret());
String response = HttpUtil.get(url);
Map<String, Object> jsonObject = JacksonUtil.parseObject(response, Map.class);
ACCESS_TOKEN = (String) jsonObject.get("access_token");
Integer expiresIn = (Integer) jsonObject.get("expires_in");
expireTime = new Date(System.currentTimeMillis() + (expiresIn != null ? expiresIn : 7200) * 1000);
return ACCESS_TOKEN;
}
public void sendServiceNotification(Map<String, Object> params) {
String url = String.format(SEND_TEMPLATE_MESSAGE_URL, getAccessToken());
String response = HttpUtil.post(url, JacksonUtil.toJSONString(params));
System.out.println(response);
}
}

View File

@@ -1,22 +0,0 @@
package com.ycwl.basic.notify.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NotifyContent {
private String title;
private String content;
private Map<String, Object> params = new HashMap<>();
public NotifyContent(String title, String content) {
this.title = title;
this.content = content;
}
}

View File

@@ -1,11 +0,0 @@
package com.ycwl.basic.notify.entity;
import lombok.Data;
@Data
public class ServerChanConfig {
private String key;
public void checkEverythingOK() {
}
}

View File

@@ -1,15 +0,0 @@
package com.ycwl.basic.notify.entity;
import lombok.Data;
@Data
public class WxMpSrvConfig {
private String appId;
private String appSecret;
private String state = "formal";
private String templateId;
public void checkEverythingOK() {
}
}

View File

@@ -1,15 +0,0 @@
package com.ycwl.basic.notify.enums;
import lombok.Getter;
@Getter
public enum NotifyType {
WX_MP_SRV("WX_MP_SRV"),
SERVER_CHAN("SERVER_CHAN");
private final String type;
NotifyType(String type) {
this.type = type;
}
}

View File

@@ -1,32 +0,0 @@
package com.ycwl.basic.notify.starter;
import com.ycwl.basic.notify.NotifyFactory;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.starter.config.NotifyConfigItem;
import com.ycwl.basic.notify.starter.config.OverallNotifyConfig;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NotifyAutoConfigurator {
private final OverallNotifyConfig config;
public NotifyAutoConfigurator(OverallNotifyConfig config) {
this.config = config;
if (config != null) {
if (config.getConfigs() != null) {
loadConfig();
}
if (StringUtils.isNotBlank(config.getDefaultUse())) {
NotifyFactory.setDefault(config.getDefaultUse());
}
}
}
private void loadConfig() {
for (NotifyConfigItem item : config.getConfigs()) {
INotifyAdapter adapter = NotifyFactory.get(item.getType());
adapter.loadConfig(item.getConfig());
NotifyFactory.register(item.getName(), adapter);
}
}
}

View File

@@ -1,13 +0,0 @@
package com.ycwl.basic.notify.starter.config;
import com.ycwl.basic.notify.enums.NotifyType;
import lombok.Data;
import java.util.Map;
@Data
public class NotifyConfigItem {
private String name;
private NotifyType type;
private Map<String, String> config;
}

View File

@@ -1,15 +0,0 @@
package com.ycwl.basic.notify.starter.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@ConfigurationProperties(prefix = "notify")
@Data
public class OverallNotifyConfig {
private String defaultUse;
private List<NotifyConfigItem> configs;
}

View File

@@ -48,7 +48,7 @@ public interface PrinterService {
PriceObj queryPrice(Long memberId, Long scenicId);
boolean addUserPhotoFromSource(Long memberId, Long scenicId, FromSourceReq req);
List<Integer> addUserPhotoFromSource(Long memberId, Long scenicId, FromSourceReq req);
Map<String, Object> createOrder(Long memberId, Long scenicId, Integer printerId);

View File

@@ -19,6 +19,7 @@ import com.ycwl.basic.pricing.dto.PriceCalculationResult;
import com.ycwl.basic.pricing.dto.ProductItem;
import com.ycwl.basic.pricing.enums.ProductType;
import com.ycwl.basic.pricing.service.IPriceCalculationService;
import com.ycwl.basic.model.pc.printer.entity.MemberPrintEntity;
import com.ycwl.basic.model.pc.printer.entity.PrintTaskEntity;
import com.ycwl.basic.model.pc.printer.entity.PrinterEntity;
import com.ycwl.basic.model.pc.printer.resp.MemberPrintResp;
@@ -44,6 +45,7 @@ import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -190,7 +192,13 @@ public class PrinterServiceImpl implements PrinterService {
@Override
public boolean addUserPhoto(Long memberId, Long scenicId, String url) {
printerMapper.addUserPhoto(memberId, scenicId, url);
MemberPrintEntity entity = new MemberPrintEntity();
entity.setMemberId(memberId);
entity.setScenicId(scenicId);
entity.setOrigUrl(url);
entity.setCropUrl(url);
entity.setStatus(0);
printerMapper.addUserPhoto(entity);
return true;
}
@@ -259,15 +267,34 @@ public class PrinterServiceImpl implements PrinterService {
}
@Override
public boolean addUserPhotoFromSource(Long memberId, Long scenicId, FromSourceReq req) {
public List<Integer> addUserPhotoFromSource(Long memberId, Long scenicId, FromSourceReq req) {
List<Integer> resultIds = new ArrayList<>();
req.getIds().forEach(id -> {
SourceRespVO byId = sourceMapper.getById(id);
if (byId == null) {
resultIds.add(null);
return;
}
printerMapper.addUserPhoto(memberId, scenicId, byId.getUrl());
MemberPrintEntity entity = new MemberPrintEntity();
entity.setMemberId(memberId);
entity.setScenicId(scenicId);
entity.setOrigUrl(byId.getUrl());
entity.setCropUrl(byId.getUrl());
entity.setStatus(0);
try {
int rows = printerMapper.addUserPhoto(entity);
if (rows > 0 && entity.getId() != null) {
resultIds.add(entity.getId());
} else {
resultIds.add(null);
}
} catch (Exception e) {
log.error("添加用户照片失败, memberId={}, scenicId={}, sourceId={}", memberId, scenicId, id, e);
resultIds.add(null);
}
});
return false;
return resultIds;
}
@Override

View File

@@ -5,6 +5,8 @@ import cn.hutool.crypto.digest.MD5;
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
import com.ycwl.basic.integration.common.manager.RenderWorkerConfigManager;
import com.ycwl.basic.integration.common.manager.ScenicConfigManager;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.repository.MemberRelationRepository;
import com.ycwl.basic.repository.SourceRepository;
import com.ycwl.basic.utils.JacksonUtil;
@@ -39,10 +41,6 @@ import com.ycwl.basic.model.task.req.TaskReqVo;
import com.ycwl.basic.model.task.req.TaskSuccessReqVo;
import com.ycwl.basic.model.task.req.WorkerAuthReqVo;
import com.ycwl.basic.model.task.resp.TaskSyncRespVo;
import com.ycwl.basic.notify.NotifyFactory;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.enums.NotifyType;
import com.ycwl.basic.repository.DeviceRepository;
import com.ycwl.basic.repository.FaceRepository;
import com.ycwl.basic.repository.RenderWorkerRepository;
@@ -128,6 +126,8 @@ public class TaskTaskServiceImpl implements TaskService {
private SourceRepository sourceRepository;
@Autowired
private MemberRelationRepository memberRelationRepository;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
private RenderWorkerEntity getWorker(@NonNull WorkerAuthReqVo req) {
String accessKey = req.getAccessKey();
@@ -653,23 +653,26 @@ public class TaskTaskServiceImpl implements TaskService {
* 生成时间 {{time4.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> timeMap2 = new HashMap<>();
timeMap2.put("value", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm"));
dataParam.put("time4", timeMap2);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频生成通知模板参数:{},用户ID:{}", params, openId);
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), openId);
dataParam.put("thing1", title);
dataParam.put("time4", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm"));
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(第一次通知)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + "/" + item.getVideoId() + ""+configContent);
msg.setTarget(openId);
msg.setExtra(extra);
msg.setSendReason("视频生成通知");
msg.setSendBiz("视频生成");
ztMessageProducerService.send(msg);
}
}

View File

@@ -1,6 +1,8 @@
package com.ycwl.basic.task;
import cn.hutool.core.date.DateUtil;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.integration.scenic.dto.scenic.ScenicV2DTO;
import com.ycwl.basic.mapper.CouponMapper;
import com.ycwl.basic.mapper.MemberMapper;
@@ -12,10 +14,6 @@ import com.ycwl.basic.model.pc.mp.MpConfigEntity;
import com.ycwl.basic.model.pc.scenic.entity.ScenicConfigEntity;
import com.ycwl.basic.model.pc.scenic.entity.ScenicEntity;
import com.ycwl.basic.model.pc.scenic.req.ScenicReqQuery;
import com.ycwl.basic.notify.NotifyFactory;
import com.ycwl.basic.notify.adapters.INotifyAdapter;
import com.ycwl.basic.notify.entity.NotifyContent;
import com.ycwl.basic.notify.enums.NotifyType;
import com.ycwl.basic.repository.ScenicRepository;
import com.ycwl.basic.repository.TemplateRepository;
import com.ycwl.basic.integration.common.manager.ScenicConfigManager;
@@ -31,8 +29,11 @@ import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
@EnableScheduling
@@ -47,24 +48,33 @@ public class DownloadNotificationTasker {
private MemberMapper memberMapper;
@Autowired
private CouponMapper couponMapper;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
@Scheduled(cron = "0 0 21 * * *")
public void sendDownloadNotification() {
log.info("开始执行定时任务");
// 用于记录已发送通知的用户ID,避免重复发送
Set<Long> sentMemberIds = ConcurrentHashMap.newKeySet();
videoMapper.listRelationByCreateTime(new Date(System.currentTimeMillis() - 24 * 60 * 60 * 1000), new Date())
.forEach(item -> {
if (item.getIsBuy() == 1) {
return;
}
// 检查该用户是否已经发送过通知,避免重复发送
if (sentMemberIds.contains(item.getMemberId())) {
log.debug("用户[memberId={}]已发送过下载通知,跳过", item.getMemberId());
return;
}
sentMemberIds.add(item.getMemberId());
MemberRespVO member = memberMapper.getById(item.getMemberId());
MpConfigEntity scenicMp = scenicRepository.getScenicMpConfig(member.getScenicId());
// 发送模板消息
String templateId = scenicRepository.getVideoDownloadTemplateId(item.getScenicId());
if (StringUtils.isBlank(templateId)) {
log.info("模板消息为空");
return;
}
log.info("发送模板消息");
ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId());
ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId());
String configTitle = configManager.getString("second_notification_title");
@@ -86,33 +96,46 @@ public class DownloadNotificationTasker {
* 景区 {{thing1.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId());
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId());
dataParam.put("thing1", title);
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(第二次通知)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("第二次通知");
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
});
}
@Scheduled(cron = "0 0 20 * * *")
public void sendExpireNotification() {
log.info("开始执行定时任务");
// 用于记录已发送通知的用户ID,避免重复发送
Set<Long> sentMemberIds = ConcurrentHashMap.newKeySet();
videoMapper.listRelationByCreateTime(new Date(System.currentTimeMillis() - 2 * 24 * 60 * 60 * 1000), new Date(System.currentTimeMillis() - 24 * 60 * 60 * 1000))
.forEach(item -> {
if (item.getIsBuy() == 1) {
return;
}
// 检查该用户是否已经发送过通知,避免重复发送
if (sentMemberIds.contains(item.getMemberId())) {
log.debug("用户[memberId={}]已发送过过期提醒通知,跳过", item.getMemberId());
return;
}
sentMemberIds.add(item.getMemberId());
MemberRespVO member = memberMapper.getById(item.getMemberId());
MpConfigEntity scenicMp = scenicRepository.getScenicMpConfig(member.getScenicId());
ScenicConfigEntity scenicConfig = scenicRepository.getScenicConfig(item.getScenicId());
Integer videoStoreDay = scenicConfig.getVideoStoreDay();
if (videoStoreDay == null) {
@@ -124,7 +147,6 @@ public class DownloadNotificationTasker {
log.info("模板消息为空");
return;
}
log.info("发送模板消息");
ScenicEntity scenic = scenicRepository.getScenic(item.getScenicId());
ScenicConfigManager configManager = scenicRepository.getScenicConfigManager(item.getScenicId());
String configTitle = configManager.getString("third_notification_title");
@@ -147,24 +169,27 @@ public class DownloadNotificationTasker {
* 过期时间 {{time2.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> dateMap = new HashMap<>();
Date expireDate = new Date(item.getCreateTime().getTime() + videoStoreDay * 24 * 60 * 60 * 1000);
dateMap.put("value", DateUtil.format(expireDate, "yyyy-MM-dd HH:mm"));
dataParam.put("time2", dateMap);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId());
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId());
Map<String, Object> dataParam = new HashMap<>();
dataParam.put("thing1", title);
dataParam.put("time2", DateUtil.format(expireDate, "yyyy-MM-dd HH:mm"));
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(第三次通知 - 过期提醒)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("第三次通知");
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
});
}
@@ -183,27 +208,34 @@ public class DownloadNotificationTasker {
calendar.clear();
scenicList.parallelStream().forEach(scenic -> {
Long scenicId = Long.parseLong(scenic.getId());
ScenicConfigEntity scenicConfig = scenicRepository.getScenicConfig(scenicId);
ScenicConfigManager scenicConfig = scenicRepository.getScenicConfigManager(scenicId);
if (scenicConfig == null) {
return;
}
if (StringUtils.isEmpty(scenicConfig.getExtraNotificationTime())) {
if (StringUtils.isEmpty(scenicConfig.getString("extra_notification_time"))) {
return;
}
List<String> timeList = Arrays.asList(StringUtils.split(scenicConfig.getExtraNotificationTime(), ","));
List<String> timeList = Arrays.asList(StringUtils.split(scenicConfig.getString("extra_notification_time"), ","));
if (!timeList.contains(String.valueOf(currentHour))) {
return;
}
log.info("当前景区{},配置了{}", scenic.getName(), scenicConfig.getExtraNotificationTime());
log.info("当前景区{},配置了{}", scenic.getName(), scenicConfig.getString("extra_notification_time"));
// 使用线程安全的Set记录已发送通知的用户ID,避免重复发送
Set<Long> sentMemberIds = ConcurrentHashMap.newKeySet();
videoMapper.listRelationByCreateTime(DateUtil.beginOfDay(new Date()), new Date())
.stream()
.filter(item -> item.getIsBuy() == 0)
.filter(item -> item.getScenicId().equals(scenicId))
.parallel()
.forEach(item -> {
// 检查该用户是否已经发送过通知,避免重复发送
if (!sentMemberIds.add(item.getMemberId())) {
log.debug("用户[memberId={}]已发送过额外下载通知,跳过", item.getMemberId());
return;
}
MemberRespVO member = memberMapper.getById(item.getMemberId());
MpConfigEntity scenicMp = scenicRepository.getScenicMpConfig(member.getScenicId());
// 发送模板消息
String templateId = scenicRepository.getVideoDownloadTemplateId(item.getScenicId());
if (StringUtils.isBlank(templateId)) {
@@ -219,7 +251,6 @@ public class DownloadNotificationTasker {
return;
}
log.info("发送模板消息");
String title = configTitle.replace("【景区】", scenic.getName());
String page;
if (configManager.getBoolean("grouping_enable", false)) {
@@ -231,20 +262,25 @@ public class DownloadNotificationTasker {
* 景区 {{thing1.DATA}}
* 备注 {{thing3.DATA}}
*/
Map<String, Object> params = new HashMap<>();
Map<String, Object> dataParam = new HashMap<>();
Map<String, String> videoMap = new HashMap<>();
videoMap.put("value", title);
dataParam.put("thing1", videoMap);
Map<String, String> remarkMap = new HashMap<>();
remarkMap.put("value", configContent);
dataParam.put("thing3", remarkMap);
params.put("data", dataParam);
params.put("page", page);
params.put("template_id", templateId);
log.info("视频下载通知模板参数:{},用户ID:{}", params, member.getOpenId());
INotifyAdapter adapter = NotifyFactory.get(NotifyType.WX_MP_SRV, scenicMp.toMap());
adapter.sendTo(new NotifyContent(title, page, params), member.getOpenId());
dataParam.put("thing1", title);
dataParam.put("thing3", configContent);
// 构建extra,只包含data和page
Map<String, Object> extra = new HashMap<>();
extra.put("data", dataParam);
extra.put("page", page);
// 使用ZT消息服务发送通知(额外下载通知)
ZtMessage msg = new ZtMessage();
msg.setChannelId(templateId);
msg.setTitle(title);
msg.setContent("" + item.getFaceId() + ""+configContent);
msg.setTarget(member.getOpenId());
msg.setExtra(extra);
msg.setSendReason("景区额外配置:" + scenicConfig.getString("extra_notification_time"));
msg.setSendBiz("定时通知");
ztMessageProducerService.send(msg);
});
});
}

View File

@@ -1,16 +1,20 @@
package com.ycwl.basic.watchdog;
import com.ycwl.basic.integration.message.dto.ZtMessage;
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
import com.ycwl.basic.mapper.TaskMapper;
import com.ycwl.basic.model.pc.task.entity.TaskEntity;
import com.ycwl.basic.notify.NotifyFactory;
import com.ycwl.basic.notify.entity.NotifyContent;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
@Profile("prod")
@@ -19,41 +23,145 @@ public class TaskWatchDog {
@Autowired
private TaskMapper taskMapper;
@Autowired
private ZtMessageProducerService ztMessageProducerService;
// 异常通知计数器
private final Map<String, Integer> notificationCounters = new HashMap<>();
// 配置参数
private static final int MAX_NOTIFICATION_COUNT = 3; // 每种异常最多通知3次
// 异常类型标识
private static final String TASK_BACKLOG = "task_backlog";
private static final String FAILED_TASKS = "failed_tasks";
private static final String LONG_RUNNING_TASK_PREFIX = "long_running_task_"; // 长时间运行任务前缀
@Scheduled(fixedDelay = 1000 * 60L)
public void scanTaskStatus() {
List<TaskEntity> allNotRunningTaskList = taskMapper.selectAllNotRunning();
String title = "任务堆积警告!";
StringBuilder content = new StringBuilder();
if (allNotRunningTaskList.size() > 10) {
content.append("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:").append(allNotRunningTaskList.size());
}
List<TaskEntity> allFailedTaskList = taskMapper.selectAllFailed();
if (allFailedTaskList.size() > 5) {
if (content.length() > 0) {
content.append("\n");
}
content.append("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:").append(allFailedTaskList.size());
}
List<TaskEntity> allRunningTaskList = taskMapper.selectAllRunning();
for (TaskEntity taskEntity : allRunningTaskList) {
// 检查任务积压
checkTaskBacklog(allNotRunningTaskList);
// 检查失败任务
checkFailedTasks(allFailedTaskList);
// 检查长时间运行任务
checkLongRunningTasks(allRunningTaskList);
}
/**
* 检查任务积压
*/
private void checkTaskBacklog(List<TaskEntity> notRunningTasks) {
if (notRunningTasks.size() > 10) {
if (shouldSendNotification(TASK_BACKLOG)) {
String content = String.format("当前任务队列中存在超过10个未运行任务,请及时处理!未运行任务数量:%d", notRunningTasks.size());
sendNotification("任务堆积警告", content, TASK_BACKLOG);
}
} else {
// 异常已恢复,重置计数器
resetNotificationCounter(TASK_BACKLOG);
}
}
/**
* 检查失败任务
*/
private void checkFailedTasks(List<TaskEntity> failedTasks) {
if (failedTasks.size() > 5) {
if (shouldSendNotification(FAILED_TASKS)) {
String content = String.format("当前存在超过5个失败任务(status=3),请及时检查和处理!失败任务数量:%d", failedTasks.size());
sendNotification("任务失败警告", content, FAILED_TASKS);
}
} else {
// 异常已恢复,重置计数器
resetNotificationCounter(FAILED_TASKS);
}
}
/**
* 检查长时间运行任务
*/
private void checkLongRunningTasks(List<TaskEntity> runningTasks) {
Set<String> currentLongRunningTasks = new HashSet<>();
for (TaskEntity taskEntity : runningTasks) {
if (taskEntity.getStartTime() == null) {
continue;
}
// startTime已经过去3分钟了
if (System.currentTimeMillis() - taskEntity.getStartTime().getTime() > 1000 * 60 * 3) {
if (content.length() > 0) {
content.append("\n");
String taskKey = LONG_RUNNING_TASK_PREFIX + taskEntity.getId();
currentLongRunningTasks.add(taskKey);
if (shouldSendNotification(taskKey)) {
String content = String.format("当前【%s】渲染机的【%d】任务已超过3分钟未完成!",
taskEntity.getWorkerId(), taskEntity.getId());
sendNotification("长时间运行任务警告", content, taskKey);
}
content.append("当前【").append(taskEntity.getWorkerId()).append("】渲染机的【").append(taskEntity.getId()).append("】任务已超过3分钟未完成!");
}
}
if (StringUtils.isNotBlank(content)) {
NotifyFactory.via().sendTo(
new NotifyContent(title, content.toString()),
"default_user"
);
// 清理已恢复正常的长时运行任务的计数器
cleanupLongRunningTaskCounters(currentLongRunningTasks);
}
/**
* 清理已恢复正常的长时运行任务的计数器
*/
private void cleanupLongRunningTaskCounters(Set<String> currentLongRunningTasks) {
Set<String> keysToRemove = new HashSet<>();
for (String key : notificationCounters.keySet()) {
if (key.startsWith(LONG_RUNNING_TASK_PREFIX)) {
if (!currentLongRunningTasks.contains(key)) {
keysToRemove.add(key);
}
}
}
// 移除已恢复任务的计数器
for (String key : keysToRemove) {
notificationCounters.remove(key);
}
}
/**
* 判断是否应该发送通知
*/
private boolean shouldSendNotification(String abnormalType) {
int count = notificationCounters.getOrDefault(abnormalType, 0);
return count < MAX_NOTIFICATION_COUNT;
}
/**
* 发送通知并更新计数器
*/
private void sendNotification(String title, String content, String abnormalType) {
ZtMessage ztMessage = ZtMessage.of(
"serverchan",
title,
content,
"system"
);
ztMessage.setSendReason("任务监控");
ztMessage.setSendBiz("系统监控");
ztMessageProducerService.send(ztMessage);
// 更新通知计数器
int currentCount = notificationCounters.getOrDefault(abnormalType, 0);
notificationCounters.put(abnormalType, currentCount + 1);
}
/**
* 重置通知计数器(异常恢复时调用)
*/
private void resetNotificationCounter(String abnormalType) {
notificationCounters.remove(abnormalType);
}
}

View File

@@ -95,7 +95,7 @@
NOW()
)
</insert>
<insert id="addUserPhoto">
<insert id="addUserPhoto" useGeneratedKeys="true" keyProperty="id">
INSERT INTO member_print (
member_id,
scenic_id,
@@ -108,8 +108,8 @@
) VALUES (
#{memberId},
#{scenicId},
#{url},
#{url},
#{origUrl},
#{cropUrl},
1,
0,
NOW(),