You've already forked FrameTour-BE
feat(integration): 添加 ZT-Message Kafka 生产者集成
- 新增 ZtMessage DTO 类用于消息体 - 实现 ZtMessageProducerService 生产者服务 - 添加示例演示如何发送消息 - 更新配置文件和文档以支持新功能
This commit is contained in:
@@ -42,6 +42,12 @@ public class KafkaConfig {
|
|||||||
@Value("${kafka.producer.buffer-memory:33554432}")
|
@Value("${kafka.producer.buffer-memory:33554432}")
|
||||||
private Integer bufferMemory;
|
private Integer bufferMemory;
|
||||||
|
|
||||||
|
@Value("${kafka.producer.enable-idempotence:true}")
|
||||||
|
private boolean enableIdempotence;
|
||||||
|
|
||||||
|
@Value("${kafka.producer.compression-type:snappy}")
|
||||||
|
private String compressionType;
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ProducerFactory<String, String> producerFactory() {
|
public ProducerFactory<String, String> producerFactory() {
|
||||||
Map<String, Object> configProps = new HashMap<>();
|
Map<String, Object> configProps = new HashMap<>();
|
||||||
@@ -53,6 +59,8 @@ public class KafkaConfig {
|
|||||||
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
||||||
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
|
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
|
||||||
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
|
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
|
||||||
|
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
|
||||||
|
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
|
||||||
|
|
||||||
return new DefaultKafkaProducerFactory<>(configProps);
|
return new DefaultKafkaProducerFactory<>(configProps);
|
||||||
}
|
}
|
||||||
|
@@ -24,6 +24,7 @@ Currently implemented:
|
|||||||
- **Scenic Integration** (`com.ycwl.basic.integration.scenic`): ZT-Scenic microservice integration
|
- **Scenic Integration** (`com.ycwl.basic.integration.scenic`): ZT-Scenic microservice integration
|
||||||
- **Device Integration** (`com.ycwl.basic.integration.device`): ZT-Device microservice integration
|
- **Device Integration** (`com.ycwl.basic.integration.device`): ZT-Device microservice integration
|
||||||
- **Render Worker Integration** (`com.ycwl.basic.integration.render`): ZT-Render-Worker microservice integration
|
- **Render Worker Integration** (`com.ycwl.basic.integration.render`): ZT-Render-Worker microservice integration
|
||||||
|
- **Message Integration** (`com.ycwl.basic.integration.message`): ZT-Message Kafka producer integration
|
||||||
|
|
||||||
### Integration Pattern
|
### Integration Pattern
|
||||||
|
|
||||||
@@ -1167,6 +1168,58 @@ fallbackService.clearAllFallbackCache("zt-render-worker");
|
|||||||
- **Active (isActive=1)**: Worker is available for tasks
|
- **Active (isActive=1)**: Worker is available for tasks
|
||||||
- **Inactive (isActive=0)**: Worker is disabled
|
- **Inactive (isActive=0)**: Worker is disabled
|
||||||
|
|
||||||
|
## ZT-Message Integration (Kafka Producer)
|
||||||
|
|
||||||
|
### Overview
|
||||||
|
The zt-message microservice accepts messages via Kafka on topic `zt-message`. This integration provides a simple producer service to publish notification messages as per docs/kafka-java-producer.md.
|
||||||
|
|
||||||
|
- Topic: `zt-message`
|
||||||
|
- Key: Use `channelId` for partitioning stability
|
||||||
|
- Value: UTF-8 JSON with fields: `channelId` (required), `title` (required), `content` (required), `target` (required), `extra` (object, optional), `sendReason` (optional), `sendBiz` (optional)
|
||||||
|
|
||||||
|
### Components
|
||||||
|
- `com.ycwl.basic.integration.message.dto.ZtMessage`: DTO for message body
|
||||||
|
- `com.ycwl.basic.integration.message.service.ZtMessageProducerService`: Producer service using Spring Kafka
|
||||||
|
- Example: `com.ycwl.basic.integration.message.example.ZtMessageProducerExample`
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
```yaml
|
||||||
|
kafka:
|
||||||
|
enabled: true # enable Kafka integration
|
||||||
|
bootstrap-servers: 127.0.0.1:9092 # adjust per environment
|
||||||
|
zt-message-topic: zt-message # topic name (default already zt-message)
|
||||||
|
producer:
|
||||||
|
acks: all
|
||||||
|
enable-idempotence: true
|
||||||
|
retries: 5
|
||||||
|
linger-ms: 10
|
||||||
|
batch-size: 32768
|
||||||
|
compression-type: snappy
|
||||||
|
```
|
||||||
|
|
||||||
|
### Usage
|
||||||
|
```java
|
||||||
|
@Autowired
|
||||||
|
private ZtMessageProducerService producer;
|
||||||
|
|
||||||
|
public void sendWelcome() {
|
||||||
|
ZtMessage msg = ZtMessage.of("dummy", "欢迎", "注册成功", "user-001");
|
||||||
|
Map<String, Object> extra = new HashMap<>();
|
||||||
|
extra.put("k", "v");
|
||||||
|
msg.setExtra(extra);
|
||||||
|
msg.setSendReason("REGISTER");
|
||||||
|
msg.setSendBiz("USER");
|
||||||
|
|
||||||
|
producer.send(msg); // key uses channelId, value is JSON
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Notes
|
||||||
|
- Required fields must be non-empty: `channelId`, `title`, `content`, `target`
|
||||||
|
- Keep message body small (< 100 KB)
|
||||||
|
- Use string for 64-bit integers in `extra` to avoid JS precision loss
|
||||||
|
- Service logs the partition/offset upon success, errors on failure
|
||||||
|
|
||||||
## Common Development Tasks
|
## Common Development Tasks
|
||||||
|
|
||||||
### Running Integration Tests
|
### Running Integration Tests
|
||||||
|
@@ -11,6 +11,7 @@ public class KafkaIntegrationProperties {
|
|||||||
|
|
||||||
private boolean enabled = false;
|
private boolean enabled = false;
|
||||||
private String bootstrapServers = "100.64.0.12:39092";
|
private String bootstrapServers = "100.64.0.12:39092";
|
||||||
|
private String ztMessageTopic = "zt-message"; // topic for zt-message microservice
|
||||||
private Consumer consumer = new Consumer();
|
private Consumer consumer = new Consumer();
|
||||||
private Producer producer = new Producer();
|
private Producer producer = new Producer();
|
||||||
|
|
||||||
|
@@ -0,0 +1,35 @@
|
|||||||
|
package com.ycwl.basic.integration.message.dto;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
public class ZtMessage {
|
||||||
|
private String channelId; // required
|
||||||
|
private String title; // required
|
||||||
|
private String content; // required
|
||||||
|
private String target; // required
|
||||||
|
private Map<String, Object> extra; // optional
|
||||||
|
private String sendReason; // optional
|
||||||
|
private String sendBiz; // optional
|
||||||
|
|
||||||
|
public static ZtMessage of(String channelId, String title, String content, String target) {
|
||||||
|
return ZtMessage.builder()
|
||||||
|
.channelId(channelId)
|
||||||
|
.title(title)
|
||||||
|
.content(content)
|
||||||
|
.target(target)
|
||||||
|
.extra(new HashMap<>())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,33 @@
|
|||||||
|
package com.ycwl.basic.integration.message.example;
|
||||||
|
|
||||||
|
import com.ycwl.basic.integration.message.dto.ZtMessage;
|
||||||
|
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@ConditionalOnProperty(name = "kafka.example.zt-message.enabled", havingValue = "true", matchIfMissing = false)
|
||||||
|
public class ZtMessageProducerExample {
|
||||||
|
|
||||||
|
private final ZtMessageProducerService producer;
|
||||||
|
|
||||||
|
public void demo() {
|
||||||
|
log.info("=== ZT-Message Producer Example ===");
|
||||||
|
|
||||||
|
ZtMessage msg = ZtMessage.of("dummy", "标题", "内容", "user-001");
|
||||||
|
var extra = new HashMap<String, Object>();
|
||||||
|
extra.put("k", "v");
|
||||||
|
msg.setExtra(extra);
|
||||||
|
msg.setSendReason("REGISTER");
|
||||||
|
msg.setSendBiz("USER");
|
||||||
|
|
||||||
|
producer.send(msg);
|
||||||
|
log.info("ZT-Message produced");
|
||||||
|
}
|
||||||
|
}
|
@@ -0,0 +1,64 @@
|
|||||||
|
package com.ycwl.basic.integration.message.service;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties;
|
||||||
|
import com.ycwl.basic.integration.message.dto.ZtMessage;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
||||||
|
public class ZtMessageProducerService {
|
||||||
|
|
||||||
|
public static final String DEFAULT_TOPIC = "zt-message";
|
||||||
|
|
||||||
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
private final KafkaIntegrationProperties kafkaProps;
|
||||||
|
|
||||||
|
public void send(ZtMessage msg) {
|
||||||
|
validate(msg);
|
||||||
|
String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic())
|
||||||
|
? kafkaProps.getZtMessageTopic()
|
||||||
|
: DEFAULT_TOPIC;
|
||||||
|
String key = msg.getChannelId();
|
||||||
|
String payload = toJson(msg);
|
||||||
|
|
||||||
|
log.info("[ZT-MESSAGE] producing to topic={}, key={}, title={}", topic, key, msg.getTitle());
|
||||||
|
kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> {
|
||||||
|
if (ex != null) {
|
||||||
|
log.error("[ZT-MESSAGE] produce failed: {}", ex.getMessage(), ex);
|
||||||
|
} else if (metadata != null) {
|
||||||
|
log.info("[ZT-MESSAGE] produced: partition={}, offset={}", metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validate(ZtMessage msg) {
|
||||||
|
if (msg == null) throw new IllegalArgumentException("message is null");
|
||||||
|
if (StringUtils.isBlank(msg.getChannelId())) throw new IllegalArgumentException("channelId is required");
|
||||||
|
if (StringUtils.isBlank(msg.getTitle())) throw new IllegalArgumentException("title is required");
|
||||||
|
if (StringUtils.isBlank(msg.getContent())) throw new IllegalArgumentException("content is required");
|
||||||
|
if (StringUtils.isBlank(msg.getTarget())) throw new IllegalArgumentException("target is required");
|
||||||
|
if (msg.getExtra() != null && !(msg.getExtra() instanceof Map)) {
|
||||||
|
throw new IllegalArgumentException("extra must be a Map");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String toJson(ZtMessage msg) {
|
||||||
|
try {
|
||||||
|
return objectMapper.writeValueAsString(msg);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new IllegalArgumentException("failed to serialize message", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user