diff --git a/src/main/java/com/ycwl/basic/config/KafkaConfig.java b/src/main/java/com/ycwl/basic/config/KafkaConfig.java index 7dec01eb..446291ff 100644 --- a/src/main/java/com/ycwl/basic/config/KafkaConfig.java +++ b/src/main/java/com/ycwl/basic/config/KafkaConfig.java @@ -42,6 +42,12 @@ public class KafkaConfig { @Value("${kafka.producer.buffer-memory:33554432}") private Integer bufferMemory; + @Value("${kafka.producer.enable-idempotence:true}") + private boolean enableIdempotence; + + @Value("${kafka.producer.compression-type:snappy}") + private String compressionType; + @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap<>(); @@ -53,6 +59,8 @@ public class KafkaConfig { configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence); + configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); return new DefaultKafkaProducerFactory<>(configProps); } diff --git a/src/main/java/com/ycwl/basic/integration/CLAUDE.md b/src/main/java/com/ycwl/basic/integration/CLAUDE.md index 0047ced1..d13092b2 100644 --- a/src/main/java/com/ycwl/basic/integration/CLAUDE.md +++ b/src/main/java/com/ycwl/basic/integration/CLAUDE.md @@ -24,6 +24,7 @@ Currently implemented: - **Scenic Integration** (`com.ycwl.basic.integration.scenic`): ZT-Scenic microservice integration - **Device Integration** (`com.ycwl.basic.integration.device`): ZT-Device microservice integration - **Render Worker Integration** (`com.ycwl.basic.integration.render`): ZT-Render-Worker microservice integration +- **Message Integration** (`com.ycwl.basic.integration.message`): ZT-Message Kafka producer integration ### Integration Pattern @@ -1167,6 +1168,58 @@ fallbackService.clearAllFallbackCache("zt-render-worker"); - **Active (isActive=1)**: Worker is available for tasks - **Inactive (isActive=0)**: Worker is disabled +## ZT-Message Integration (Kafka Producer) + +### Overview +The zt-message microservice accepts messages via Kafka on topic `zt-message`. This integration provides a simple producer service to publish notification messages as per docs/kafka-java-producer.md. + +- Topic: `zt-message` +- Key: Use `channelId` for partitioning stability +- Value: UTF-8 JSON with fields: `channelId` (required), `title` (required), `content` (required), `target` (required), `extra` (object, optional), `sendReason` (optional), `sendBiz` (optional) + +### Components +- `com.ycwl.basic.integration.message.dto.ZtMessage`: DTO for message body +- `com.ycwl.basic.integration.message.service.ZtMessageProducerService`: Producer service using Spring Kafka +- Example: `com.ycwl.basic.integration.message.example.ZtMessageProducerExample` + +### Configuration +```yaml +kafka: + enabled: true # enable Kafka integration + bootstrap-servers: 127.0.0.1:9092 # adjust per environment + zt-message-topic: zt-message # topic name (default already zt-message) + producer: + acks: all + enable-idempotence: true + retries: 5 + linger-ms: 10 + batch-size: 32768 + compression-type: snappy +``` + +### Usage +```java +@Autowired +private ZtMessageProducerService producer; + +public void sendWelcome() { + ZtMessage msg = ZtMessage.of("dummy", "欢迎", "注册成功", "user-001"); + Map extra = new HashMap<>(); + extra.put("k", "v"); + msg.setExtra(extra); + msg.setSendReason("REGISTER"); + msg.setSendBiz("USER"); + + producer.send(msg); // key uses channelId, value is JSON +} +``` + +### Notes +- Required fields must be non-empty: `channelId`, `title`, `content`, `target` +- Keep message body small (< 100 KB) +- Use string for 64-bit integers in `extra` to avoid JS precision loss +- Service logs the partition/offset upon success, errors on failure + ## Common Development Tasks ### Running Integration Tests diff --git a/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java b/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java index cf4842aa..6457e0f3 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java @@ -11,6 +11,7 @@ public class KafkaIntegrationProperties { private boolean enabled = false; private String bootstrapServers = "100.64.0.12:39092"; + private String ztMessageTopic = "zt-message"; // topic for zt-message microservice private Consumer consumer = new Consumer(); private Producer producer = new Producer(); diff --git a/src/main/java/com/ycwl/basic/integration/message/dto/ZtMessage.java b/src/main/java/com/ycwl/basic/integration/message/dto/ZtMessage.java new file mode 100644 index 00000000..2222a343 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/message/dto/ZtMessage.java @@ -0,0 +1,35 @@ +package com.ycwl.basic.integration.message.dto; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ZtMessage { + private String channelId; // required + private String title; // required + private String content; // required + private String target; // required + private Map extra; // optional + private String sendReason; // optional + private String sendBiz; // optional + + public static ZtMessage of(String channelId, String title, String content, String target) { + return ZtMessage.builder() + .channelId(channelId) + .title(title) + .content(content) + .target(target) + .extra(new HashMap<>()) + .build(); + } +} \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/integration/message/example/ZtMessageProducerExample.java b/src/main/java/com/ycwl/basic/integration/message/example/ZtMessageProducerExample.java new file mode 100644 index 00000000..352f13d8 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/message/example/ZtMessageProducerExample.java @@ -0,0 +1,33 @@ +package com.ycwl.basic.integration.message.example; + +import com.ycwl.basic.integration.message.dto.ZtMessage; +import com.ycwl.basic.integration.message.service.ZtMessageProducerService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.HashMap; + +@Slf4j +@Component +@RequiredArgsConstructor +@ConditionalOnProperty(name = "kafka.example.zt-message.enabled", havingValue = "true", matchIfMissing = false) +public class ZtMessageProducerExample { + + private final ZtMessageProducerService producer; + + public void demo() { + log.info("=== ZT-Message Producer Example ==="); + + ZtMessage msg = ZtMessage.of("dummy", "标题", "内容", "user-001"); + var extra = new HashMap(); + extra.put("k", "v"); + msg.setExtra(extra); + msg.setSendReason("REGISTER"); + msg.setSendBiz("USER"); + + producer.send(msg); + log.info("ZT-Message produced"); + } +} diff --git a/src/main/java/com/ycwl/basic/integration/message/service/ZtMessageProducerService.java b/src/main/java/com/ycwl/basic/integration/message/service/ZtMessageProducerService.java new file mode 100644 index 00000000..2386a9d5 --- /dev/null +++ b/src/main/java/com/ycwl/basic/integration/message/service/ZtMessageProducerService.java @@ -0,0 +1,64 @@ +package com.ycwl.basic.integration.message.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties; +import com.ycwl.basic.integration.message.dto.ZtMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.util.Map; + +@Slf4j +@Service +@RequiredArgsConstructor +@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true") +public class ZtMessageProducerService { + + public static final String DEFAULT_TOPIC = "zt-message"; + + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + private final KafkaIntegrationProperties kafkaProps; + + public void send(ZtMessage msg) { + validate(msg); + String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic()) + ? kafkaProps.getZtMessageTopic() + : DEFAULT_TOPIC; + String key = msg.getChannelId(); + String payload = toJson(msg); + + log.info("[ZT-MESSAGE] producing to topic={}, key={}, title={}", topic, key, msg.getTitle()); + kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> { + if (ex != null) { + log.error("[ZT-MESSAGE] produce failed: {}", ex.getMessage(), ex); + } else if (metadata != null) { + log.info("[ZT-MESSAGE] produced: partition={}, offset={}", metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset()); + } + }); + } + + private void validate(ZtMessage msg) { + if (msg == null) throw new IllegalArgumentException("message is null"); + if (StringUtils.isBlank(msg.getChannelId())) throw new IllegalArgumentException("channelId is required"); + if (StringUtils.isBlank(msg.getTitle())) throw new IllegalArgumentException("title is required"); + if (StringUtils.isBlank(msg.getContent())) throw new IllegalArgumentException("content is required"); + if (StringUtils.isBlank(msg.getTarget())) throw new IllegalArgumentException("target is required"); + if (msg.getExtra() != null && !(msg.getExtra() instanceof Map)) { + throw new IllegalArgumentException("extra must be a Map"); + } + } + + private String toJson(ZtMessage msg) { + try { + return objectMapper.writeValueAsString(msg); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("failed to serialize message", e); + } + } +}