feat(message):为消息添加唯一标识符支持

- 在 ZtMessage DTO 中新增 messageId 字段
- 发送消息前自动生成 UUID 作为默认 messageId
- 更新 Kafka 生产者日志,包含 messageId 以便追踪
- 增强错误日志记录,附带 messageId 提升调试效率
This commit is contained in:
2025-10-14 18:27:15 +08:00
parent 3cb12c13c2
commit f54d40d026
2 changed files with 10 additions and 3 deletions

View File

@@ -15,6 +15,7 @@ import java.util.Map;
@AllArgsConstructor @AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
public class ZtMessage { public class ZtMessage {
private String messageId; // unique message identifier
private String channelId; // required private String channelId; // required
private String title; // required private String title; // required
private String content; // required private String content; // required

View File

@@ -27,18 +27,24 @@ public class ZtMessageProducerService {
public void send(ZtMessage msg) { public void send(ZtMessage msg) {
validate(msg); validate(msg);
// Generate messageId if not present
if (StringUtils.isBlank(msg.getMessageId())) {
msg.setMessageId(java.util.UUID.randomUUID().toString());
}
String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic()) String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic())
? kafkaProps.getZtMessageTopic() ? kafkaProps.getZtMessageTopic()
: DEFAULT_TOPIC; : DEFAULT_TOPIC;
String key = msg.getChannelId(); String key = msg.getChannelId();
String payload = toJson(msg); String payload = toJson(msg);
log.info("[ZT-MESSAGE] producing to topic={}, key={}, title={}", topic, key, msg.getTitle()); log.info("[ZT-MESSAGE] producing to topic={}, key={}, messageId={}, title={}", topic, key, msg.getMessageId(), msg.getTitle());
kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> { kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> {
if (ex != null) { if (ex != null) {
log.error("[ZT-MESSAGE] produce failed: {}", ex.getMessage(), ex); log.error("[ZT-MESSAGE] produce failed: messageId={}, error={}", msg.getMessageId(), ex.getMessage(), ex);
} else if (metadata != null) { } else if (metadata != null) {
log.info("[ZT-MESSAGE] produced: partition={}, offset={}", metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset()); log.info("[ZT-MESSAGE] produced: messageId={}, partition={}, offset={}", msg.getMessageId(), metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset());
} }
}); });
} }