feat(kafka): 新增ZT-Source Kafka消息处理功能

- 新增ZTSourceMessage实体类用于接收Kafka消息
- 新增ZTSourceConsumerService监听zt-source主题
- 新增ZTSourceDataService处理消息并保存至数据库- 扩展SourceMapper支持从ZT-Source消息新增素材
- 实现照片类型素材的解析、校验与存储逻辑
- 添加Kafka手动ACK确认机制确保消息可靠处理
This commit is contained in:
2025-09-27 22:16:47 +08:00
parent 4b01e4cf82
commit 9bc34fcfdb
5 changed files with 267 additions and 0 deletions

View File

@@ -0,0 +1,69 @@
package com.ycwl.basic.service;
import com.ycwl.basic.dto.ZTSourceMessage;
import com.ycwl.basic.utils.JacksonUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
/**
* ZT-Source Kafka消费者服务
* 监听zt-source topic并处理素材消息
*
* @author system
* @date 2024/12/27
*/
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class ZTSourceConsumerService {
private static final String ZT_SOURCE_TOPIC = "zt-source";
private final ZTSourceDataService ztSourceDataService;
/**
* 监听zt-source topic消息
* 先解析消息并输出业务日志,然后手动确认处理
*
* @param message 消息JSON字符串
* @param ack 手动ACK确认
*/
@KafkaListener(topics = ZT_SOURCE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory")
public void handleZTSourceMessage(String message, Acknowledgment ack) {
ZTSourceMessage sourceMessage = null;
try {
// 先解析消息
sourceMessage = JacksonUtil.parseObject(message, ZTSourceMessage.class);
// 输出业务相关的日志信息
log.info("接收到ZT-Source消息, sourceId: {}, sourceType: {}, scenicId: {}, deviceId: {}, sourceUrl: {}",
sourceMessage.getSourceId(), sourceMessage.getSourceType(),
sourceMessage.getScenicId(), sourceMessage.getDeviceId(), sourceMessage.getSourceUrl());
// 处理消息
boolean processed = ztSourceDataService.processZTSourceMessage(sourceMessage);
if (processed) {
// 只有在处理成功后才手动提交
ack.acknowledge();
log.info("ZT-Source消息处理成功并已提交, sourceId: {}", sourceMessage.getSourceId());
} else {
log.warn("ZT-Source消息处理被跳过(非照片类型),消息不会被提交, sourceId: {}, sourceType: {}",
sourceMessage.getSourceId(), sourceMessage.getSourceType());
// 对于非照片类型,也提交消息避免重复消费
ack.acknowledge();
}
} catch (Exception e) {
String sourceId = sourceMessage != null ? sourceMessage.getSourceId().toString() : "unknown";
log.error("处理ZT-Source消息失败,消息不会被提交: sourceId={}, error={}", sourceId, e.getMessage(), e);
// 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费
}
}
}