From f54d40d0260ee9f327728fb45d119392a7ec974b Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Tue, 14 Oct 2025 18:27:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(message):=E4=B8=BA=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=94=AF=E4=B8=80=E6=A0=87=E8=AF=86=E7=AC=A6?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 ZtMessage DTO 中新增 messageId 字段 - 发送消息前自动生成 UUID 作为默认 messageId - 更新 Kafka 生产者日志,包含 messageId 以便追踪 - 增强错误日志记录,附带 messageId 提升调试效率 --- .../basic/integration/message/dto/ZtMessage.java | 1 + .../message/service/ZtMessageProducerService.java | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/ycwl/basic/integration/message/dto/ZtMessage.java b/src/main/java/com/ycwl/basic/integration/message/dto/ZtMessage.java index 2222a343..78f8fc8b 100644 --- a/src/main/java/com/ycwl/basic/integration/message/dto/ZtMessage.java +++ b/src/main/java/com/ycwl/basic/integration/message/dto/ZtMessage.java @@ -15,6 +15,7 @@ import java.util.Map; @AllArgsConstructor @JsonInclude(JsonInclude.Include.NON_NULL) public class ZtMessage { + private String messageId; // unique message identifier private String channelId; // required private String title; // required private String content; // required diff --git a/src/main/java/com/ycwl/basic/integration/message/service/ZtMessageProducerService.java b/src/main/java/com/ycwl/basic/integration/message/service/ZtMessageProducerService.java index 2386a9d5..b7dd2b9c 100644 --- a/src/main/java/com/ycwl/basic/integration/message/service/ZtMessageProducerService.java +++ b/src/main/java/com/ycwl/basic/integration/message/service/ZtMessageProducerService.java @@ -27,18 +27,24 @@ public class ZtMessageProducerService { public void send(ZtMessage msg) { validate(msg); + + // Generate messageId if not present + if (StringUtils.isBlank(msg.getMessageId())) { + msg.setMessageId(java.util.UUID.randomUUID().toString()); + } + String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic()) ? kafkaProps.getZtMessageTopic() : DEFAULT_TOPIC; String key = msg.getChannelId(); String payload = toJson(msg); - log.info("[ZT-MESSAGE] producing to topic={}, key={}, title={}", topic, key, msg.getTitle()); + log.info("[ZT-MESSAGE] producing to topic={}, key={}, messageId={}, title={}", topic, key, msg.getMessageId(), msg.getTitle()); kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> { if (ex != null) { - log.error("[ZT-MESSAGE] produce failed: {}", ex.getMessage(), ex); + log.error("[ZT-MESSAGE] produce failed: messageId={}, error={}", msg.getMessageId(), ex.getMessage(), ex); } else if (metadata != null) { - log.info("[ZT-MESSAGE] produced: partition={}, offset={}", metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset()); + log.info("[ZT-MESSAGE] produced: messageId={}, partition={}, offset={}", msg.getMessageId(), metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset()); } }); }