You've already forked FrameTour-BE
Merge branch 'message-microservice'
# Conflicts: # src/main/java/com/ycwl/basic/integration/CLAUDE.md
This commit is contained in:
@@ -43,6 +43,12 @@ public class KafkaConfig {
|
||||
@Value("${kafka.producer.buffer-memory:33554432}")
|
||||
private Integer bufferMemory;
|
||||
|
||||
@Value("${kafka.producer.enable-idempotence:true}")
|
||||
private boolean enableIdempotence;
|
||||
|
||||
@Value("${kafka.producer.compression-type:snappy}")
|
||||
private String compressionType;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
@@ -54,6 +60,8 @@ public class KafkaConfig {
|
||||
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
|
||||
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
|
||||
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
|
||||
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
|
||||
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
|
||||
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
|
@@ -0,0 +1,60 @@
|
||||
package com.ycwl.basic.controller.pc;
|
||||
|
||||
import com.ycwl.basic.integration.message.dto.ChannelsResponse;
|
||||
import com.ycwl.basic.integration.message.dto.MessageListData;
|
||||
import com.ycwl.basic.integration.message.service.MessageIntegrationService;
|
||||
import com.ycwl.basic.utils.ApiResponse;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("/api/message/v1")
|
||||
@RequiredArgsConstructor
|
||||
public class MessageController {
|
||||
|
||||
private final MessageIntegrationService messageService;
|
||||
|
||||
@GetMapping("/messages")
|
||||
public ApiResponse<MessageListData> listMessages(
|
||||
@RequestParam(defaultValue = "1") Integer page,
|
||||
@RequestParam(defaultValue = "20") Integer pageSize,
|
||||
@RequestParam(required = false) String channelId,
|
||||
@RequestParam(required = false) String title,
|
||||
@RequestParam(required = false) String content,
|
||||
@RequestParam(required = false) String sendBiz,
|
||||
@RequestParam(required = false) String sentAtStart,
|
||||
@RequestParam(required = false) String sentAtEnd,
|
||||
@RequestParam(required = false) String createdAtStart,
|
||||
@RequestParam(required = false) String createdAtEnd
|
||||
) {
|
||||
log.info("PC|消息列表查询 page={}, pageSize={}, channelId={}, title={}, sendBiz={}", page, pageSize, channelId, title, sendBiz);
|
||||
if (pageSize > 100) {
|
||||
pageSize = 100;
|
||||
}
|
||||
try {
|
||||
MessageListData data = messageService.listMessages(page, pageSize, channelId, title, content, sendBiz,
|
||||
sentAtStart, sentAtEnd, createdAtStart, createdAtEnd);
|
||||
return ApiResponse.success(data);
|
||||
} catch (Exception e) {
|
||||
log.error("PC|消息列表查询失败", e);
|
||||
return ApiResponse.fail("消息列表查询失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@GetMapping("/channels")
|
||||
public ApiResponse<ChannelsResponse> listChannels() {
|
||||
log.info("PC|获取消息通道列表");
|
||||
try {
|
||||
ChannelsResponse data = messageService.listChannels();
|
||||
return ApiResponse.success(data);
|
||||
} catch (Exception e) {
|
||||
log.error("PC|获取消息通道列表失败", e);
|
||||
return ApiResponse.fail("获取消息通道列表失败: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
@@ -25,6 +25,7 @@ Currently implemented:
|
||||
- **Device Integration** (`com.ycwl.basic.integration.device`): ZT-Device microservice integration
|
||||
- **Render Worker Integration** (`com.ycwl.basic.integration.render`): ZT-Render-Worker microservice integration
|
||||
- **Questionnaire Integration** (`com.ycwl.basic.integration.questionnaire`): ZT-Questionnaire microservice integration
|
||||
- **Message Integration** (`com.ycwl.basic.integration.message`): ZT-Message Kafka producer integration
|
||||
|
||||
### Integration Pattern
|
||||
|
||||
@@ -34,8 +35,7 @@ service/
|
||||
├── client/ # Feign clients for HTTP calls
|
||||
├── config/ # Service-specific configuration
|
||||
├── dto/ # Data transfer objects
|
||||
├── service/ # Service layer with business logic
|
||||
└── example/ # Usage examples
|
||||
└── service/ # Service layer with business logic
|
||||
```
|
||||
|
||||
## Integration Fallback Mechanism
|
||||
@@ -792,13 +792,6 @@ mvn test -Dtest=DefaultConfigIntegrationServiceTest
|
||||
|
||||
# Run all device integration tests (including default configs)
|
||||
mvn test -Dtest="com.ycwl.basic.integration.device.*Test"
|
||||
|
||||
# Enable example runner in application-dev.yml
|
||||
integration:
|
||||
device:
|
||||
example:
|
||||
default-config:
|
||||
enabled: true
|
||||
```
|
||||
|
||||
### Common Configuration Keys
|
||||
@@ -820,8 +813,7 @@ com.ycwl.basic.integration.{service-name}/
|
||||
├── client/
|
||||
├── config/
|
||||
├── dto/
|
||||
├── service/
|
||||
└── example/
|
||||
└── service/
|
||||
```
|
||||
|
||||
### 2. Add Configuration Properties
|
||||
@@ -1168,6 +1160,57 @@ fallbackService.clearAllFallbackCache("zt-render-worker");
|
||||
- **Active (isActive=1)**: Worker is available for tasks
|
||||
- **Inactive (isActive=0)**: Worker is disabled
|
||||
|
||||
## ZT-Message Integration (Kafka Producer)
|
||||
|
||||
### Overview
|
||||
The zt-message microservice accepts messages via Kafka on topic `zt-message`. This integration provides a simple producer service to publish notification messages.
|
||||
|
||||
- Topic: `zt-message`
|
||||
- Key: Use `channelId` for partitioning stability
|
||||
- Value: UTF-8 JSON with fields: `channelId` (required), `title` (required), `content` (required), `target` (required), `extra` (object, optional), `sendReason` (optional), `sendBiz` (optional)
|
||||
|
||||
### Components
|
||||
- `com.ycwl.basic.integration.message.dto.ZtMessage`: DTO for message body
|
||||
- `com.ycwl.basic.integration.message.service.ZtMessageProducerService`: Producer service using Spring Kafka
|
||||
|
||||
### Configuration
|
||||
```yaml
|
||||
kafka:
|
||||
enabled: true # enable Kafka integration
|
||||
bootstrap-servers: 127.0.0.1:9092 # adjust per environment
|
||||
zt-message-topic: zt-message # topic name (default already zt-message)
|
||||
producer:
|
||||
acks: all
|
||||
enable-idempotence: true
|
||||
retries: 5
|
||||
linger-ms: 10
|
||||
batch-size: 32768
|
||||
compression-type: snappy
|
||||
```
|
||||
|
||||
### Usage
|
||||
```java
|
||||
@Autowired
|
||||
private ZtMessageProducerService producer;
|
||||
|
||||
public void sendWelcome() {
|
||||
ZtMessage msg = ZtMessage.of("dummy", "欢迎", "注册成功", "user-001");
|
||||
Map<String, Object> extra = new HashMap<>();
|
||||
extra.put("k", "v");
|
||||
msg.setExtra(extra);
|
||||
msg.setSendReason("REGISTER");
|
||||
msg.setSendBiz("USER");
|
||||
|
||||
producer.send(msg); // key uses channelId, value is JSON
|
||||
}
|
||||
```
|
||||
|
||||
### Notes
|
||||
- Required fields must be non-empty: `channelId`, `title`, `content`, `target`
|
||||
- Keep message body small (< 100 KB)
|
||||
- Use string for 64-bit integers in `extra` to avoid JS precision loss
|
||||
- Service logs the partition/offset upon success, errors on failure
|
||||
|
||||
## Common Development Tasks
|
||||
|
||||
### Running Integration Tests
|
||||
|
@@ -11,6 +11,7 @@ public class KafkaIntegrationProperties {
|
||||
|
||||
private boolean enabled = false;
|
||||
private String bootstrapServers = "100.64.0.12:39092";
|
||||
private String ztMessageTopic = "zt-message"; // topic for zt-message microservice
|
||||
private Consumer consumer = new Consumer();
|
||||
private Producer producer = new Producer();
|
||||
|
||||
|
@@ -0,0 +1,29 @@
|
||||
package com.ycwl.basic.integration.message.client;
|
||||
|
||||
import com.ycwl.basic.integration.common.response.CommonResponse;
|
||||
import com.ycwl.basic.integration.message.dto.ChannelsResponse;
|
||||
import com.ycwl.basic.integration.message.dto.MessageListData;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
|
||||
@FeignClient(name = "zt-message", contextId = "zt-message", path = "")
|
||||
public interface MessageClient {
|
||||
|
||||
@GetMapping("/messages")
|
||||
CommonResponse<MessageListData> listMessages(
|
||||
@RequestParam(name = "page", defaultValue = "1") Integer page,
|
||||
@RequestParam(name = "pageSize", defaultValue = "20") Integer pageSize,
|
||||
@RequestParam(name = "channelId", required = false) String channelId,
|
||||
@RequestParam(name = "title", required = false) String title,
|
||||
@RequestParam(name = "content", required = false) String content,
|
||||
@RequestParam(name = "sendBiz", required = false) String sendBiz,
|
||||
@RequestParam(name = "sentAtStart", required = false) String sentAtStart,
|
||||
@RequestParam(name = "sentAtEnd", required = false) String sentAtEnd,
|
||||
@RequestParam(name = "createdAtStart", required = false) String createdAtStart,
|
||||
@RequestParam(name = "createdAtEnd", required = false) String createdAtEnd
|
||||
);
|
||||
|
||||
@GetMapping("/channels")
|
||||
CommonResponse<ChannelsResponse> listChannels();
|
||||
}
|
@@ -0,0 +1,10 @@
|
||||
package com.ycwl.basic.integration.message.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class ChannelsResponse {
|
||||
private List<String> channels;
|
||||
}
|
@@ -0,0 +1,13 @@
|
||||
package com.ycwl.basic.integration.message.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class MessageListData {
|
||||
private List<MessageRecordDTO> list;
|
||||
private String total; // string to avoid JS precision
|
||||
private Integer page;
|
||||
private Integer pageSize;
|
||||
}
|
@@ -0,0 +1,25 @@
|
||||
package com.ycwl.basic.integration.message.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class MessageRecordDTO {
|
||||
private String id; // string to avoid JS precision
|
||||
private String channelId;
|
||||
private String title;
|
||||
private String content;
|
||||
private String target;
|
||||
private Map<String, Object> extraJson;
|
||||
private String sendReason;
|
||||
private String sendBiz;
|
||||
private String status;
|
||||
private String errorMsg;
|
||||
private Integer attempts;
|
||||
private String sentAt; // RFC3339 or yyyy-MM-dd HH:mm:ss (pass-through)
|
||||
private String createdAt;
|
||||
private String updatedAt;
|
||||
}
|
@@ -0,0 +1,35 @@
|
||||
package com.ycwl.basic.integration.message.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class ZtMessage {
|
||||
private String channelId; // required
|
||||
private String title; // required
|
||||
private String content; // required
|
||||
private String target; // required
|
||||
private Map<String, Object> extra; // optional
|
||||
private String sendReason; // optional
|
||||
private String sendBiz; // optional
|
||||
|
||||
public static ZtMessage of(String channelId, String title, String content, String target) {
|
||||
return ZtMessage.builder()
|
||||
.channelId(channelId)
|
||||
.title(title)
|
||||
.content(content)
|
||||
.target(target)
|
||||
.extra(new HashMap<>())
|
||||
.build();
|
||||
}
|
||||
}
|
@@ -0,0 +1,33 @@
|
||||
package com.ycwl.basic.integration.message.example;
|
||||
|
||||
import com.ycwl.basic.integration.message.dto.ZtMessage;
|
||||
import com.ycwl.basic.integration.message.service.ZtMessageProducerService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(name = "kafka.example.zt-message.enabled", havingValue = "true", matchIfMissing = false)
|
||||
public class ZtMessageProducerExample {
|
||||
|
||||
private final ZtMessageProducerService producer;
|
||||
|
||||
public void demo() {
|
||||
log.info("=== ZT-Message Producer Example ===");
|
||||
|
||||
ZtMessage msg = ZtMessage.of("dummy", "标题", "内容", "user-001");
|
||||
var extra = new HashMap<String, Object>();
|
||||
extra.put("k", "v");
|
||||
msg.setExtra(extra);
|
||||
msg.setSendReason("REGISTER");
|
||||
msg.setSendBiz("USER");
|
||||
|
||||
producer.send(msg);
|
||||
log.info("ZT-Message produced");
|
||||
}
|
||||
}
|
@@ -0,0 +1,57 @@
|
||||
package com.ycwl.basic.integration.message.service;
|
||||
|
||||
import com.ycwl.basic.integration.common.exception.IntegrationException;
|
||||
import com.ycwl.basic.integration.common.response.CommonResponse;
|
||||
import com.ycwl.basic.integration.common.service.IntegrationFallbackService;
|
||||
import com.ycwl.basic.integration.message.client.MessageClient;
|
||||
import com.ycwl.basic.integration.message.dto.ChannelsResponse;
|
||||
import com.ycwl.basic.integration.message.dto.MessageListData;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class MessageIntegrationService {
|
||||
|
||||
private final MessageClient client;
|
||||
private final IntegrationFallbackService fallbackService;
|
||||
|
||||
private static final String SERVICE_NAME = "zt-message";
|
||||
|
||||
public MessageListData listMessages(Integer page, Integer pageSize,
|
||||
String channelId, String title, String content, String sendBiz,
|
||||
String sentAtStart, String sentAtEnd,
|
||||
String createdAtStart, String createdAtEnd) {
|
||||
log.debug("查询消息列表 page={}, pageSize={}, channelId={}, title={}, sendBiz={}", page, pageSize, channelId, title, sendBiz);
|
||||
CommonResponse<MessageListData> resp = client.listMessages(page, pageSize, channelId, title, content, sendBiz,
|
||||
sentAtStart, sentAtEnd, createdAtStart, createdAtEnd);
|
||||
return handleResponse(resp, "查询消息列表失败");
|
||||
}
|
||||
|
||||
public ChannelsResponse listChannels() {
|
||||
log.debug("查询消息通道列表");
|
||||
// 相对稳定的数据,使用fallback缓存
|
||||
return fallbackService.executeWithFallback(
|
||||
SERVICE_NAME,
|
||||
"channels",
|
||||
() -> {
|
||||
CommonResponse<ChannelsResponse> resp = client.listChannels();
|
||||
return handleResponse(resp, "查询通道列表失败");
|
||||
},
|
||||
ChannelsResponse.class
|
||||
);
|
||||
}
|
||||
|
||||
private <T> T handleResponse(CommonResponse<T> response, String errorMessage) {
|
||||
if (response == null || !response.isSuccess()) {
|
||||
String msg = response != null && response.getMessage() != null
|
||||
? response.getMessage()
|
||||
: errorMessage;
|
||||
Integer code = response != null ? response.getCode() : 5000;
|
||||
throw new IntegrationException(code, msg, SERVICE_NAME);
|
||||
}
|
||||
return response.getData();
|
||||
}
|
||||
}
|
@@ -0,0 +1,64 @@
|
||||
package com.ycwl.basic.integration.message.service;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties;
|
||||
import com.ycwl.basic.integration.message.dto.ZtMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
||||
public class ZtMessageProducerService {
|
||||
|
||||
public static final String DEFAULT_TOPIC = "zt-message";
|
||||
|
||||
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final KafkaIntegrationProperties kafkaProps;
|
||||
|
||||
public void send(ZtMessage msg) {
|
||||
validate(msg);
|
||||
String topic = kafkaProps != null && StringUtils.isNotBlank(kafkaProps.getZtMessageTopic())
|
||||
? kafkaProps.getZtMessageTopic()
|
||||
: DEFAULT_TOPIC;
|
||||
String key = msg.getChannelId();
|
||||
String payload = toJson(msg);
|
||||
|
||||
log.info("[ZT-MESSAGE] producing to topic={}, key={}, title={}", topic, key, msg.getTitle());
|
||||
kafkaTemplate.send(topic, key, payload).whenComplete((metadata, ex) -> {
|
||||
if (ex != null) {
|
||||
log.error("[ZT-MESSAGE] produce failed: {}", ex.getMessage(), ex);
|
||||
} else if (metadata != null) {
|
||||
log.info("[ZT-MESSAGE] produced: partition={}, offset={}", metadata.getRecordMetadata().partition(), metadata.getRecordMetadata().offset());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void validate(ZtMessage msg) {
|
||||
if (msg == null) throw new IllegalArgumentException("message is null");
|
||||
if (StringUtils.isBlank(msg.getChannelId())) throw new IllegalArgumentException("channelId is required");
|
||||
if (StringUtils.isBlank(msg.getTitle())) throw new IllegalArgumentException("title is required");
|
||||
if (StringUtils.isBlank(msg.getContent())) throw new IllegalArgumentException("content is required");
|
||||
if (StringUtils.isBlank(msg.getTarget())) throw new IllegalArgumentException("target is required");
|
||||
if (msg.getExtra() != null && !(msg.getExtra() instanceof Map)) {
|
||||
throw new IllegalArgumentException("extra must be a Map");
|
||||
}
|
||||
}
|
||||
|
||||
private String toJson(ZtMessage msg) {
|
||||
try {
|
||||
return objectMapper.writeValueAsString(msg);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException("failed to serialize message", e);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user