package com.ycwl.basic.service; import com.ycwl.basic.dto.ZTSourceMessage; import com.ycwl.basic.utils.JacksonUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; /** * ZT-Source Kafka消费者服务 * 监听zt-source topic并处理素材消息 * * @author system * @date 2024/12/27 */ @Slf4j @Service @RequiredArgsConstructor @ConditionalOnProperty(name = "kafka.enabled", havingValue = "true") public class ZTSourceConsumerService { private static final String ZT_SOURCE_TOPIC = "zt-source"; private final ZTSourceDataService ztSourceDataService; /** * 监听zt-source topic消息 * 先解析消息并输出业务日志,然后手动确认处理 * * @param message 消息JSON字符串 * @param ack 手动ACK确认 */ @KafkaListener(topics = ZT_SOURCE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory") public void handleZTSourceMessage(String message, Acknowledgment ack) { ZTSourceMessage sourceMessage = null; try { // 先解析消息 sourceMessage = JacksonUtil.parseObject(message, ZTSourceMessage.class); // 输出业务相关的日志信息 log.debug("接收到ZT-Source消息, sourceId: {}, deviceId: {}, faceSampleId: {}", sourceMessage.getSourceId(), sourceMessage.getDeviceId(), sourceMessage.getFaceSampleId()); // 处理消息 boolean processed = ztSourceDataService.processZTSourceMessage(sourceMessage); if (processed) { // 只有在处理成功后才手动提交 ack.acknowledge(); log.info("ZT-Source消息处理成功并已提交, sourceId: {}", sourceMessage.getSourceId()); } else { log.warn("ZT-Source消息处理被跳过(非照片类型),消息不会被提交, sourceId: {}, sourceType: {}", sourceMessage.getSourceId(), sourceMessage.getSourceType()); // 对于非照片类型,也提交消息避免重复消费 ack.acknowledge(); } } catch (Exception e) { String sourceId = sourceMessage != null ? sourceMessage.getSourceId().toString() : "unknown"; log.error("处理ZT-Source消息失败,消息不会被提交: sourceId={}, error={}", sourceId, e.getMessage(), e); // 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费 } } }