From dc2154c020ce7caac6e4a6e44a905f6780326ad7 Mon Sep 17 00:00:00 2001
From: Jerry Yan <792602257@qq.com>
Date: Fri, 12 Sep 2025 06:38:44 +0800
Subject: [PATCH] =?UTF-8?q?feat(integration):=20=E6=B7=BB=E5=8A=A0=20Kafka?=
=?UTF-8?q?=20=E6=B6=88=E6=81=AF=E7=B3=BB=E7=BB=9F=E9=9B=86=E6=88=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增 Kafka 配置和连接测试功能- 实现人脸处理消息的消费逻辑
- 添加消息发送预留接口
- 优化人脸样本保存和处理流程
---
pom.xml | 6 +
.../com/ycwl/basic/config/KafkaConfig.java | 83 ++++++++
.../config/KafkaIntegrationProperties.java | 35 ++++
.../kafka/dto/FaceProcessingMessage.java | 83 ++++++++
.../integration/kafka/dto/KafkaMessage.java | 38 ++++
.../example/KafkaIntegrationExample.java | 57 ++++++
.../service/FaceProcessingKafkaService.java | 186 ++++++++++++++++++
.../service/KafkaIntegrationService.java | 51 +++++
src/main/resources/application-dev.yml | 18 ++
9 files changed, 557 insertions(+)
create mode 100644 src/main/java/com/ycwl/basic/config/KafkaConfig.java
create mode 100644 src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java
create mode 100644 src/main/java/com/ycwl/basic/integration/kafka/dto/FaceProcessingMessage.java
create mode 100644 src/main/java/com/ycwl/basic/integration/kafka/dto/KafkaMessage.java
create mode 100644 src/main/java/com/ycwl/basic/integration/kafka/example/KafkaIntegrationExample.java
create mode 100644 src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java
create mode 100644 src/main/java/com/ycwl/basic/integration/kafka/service/KafkaIntegrationService.java
diff --git a/pom.xml b/pom.xml
index de66305a..070e7e3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -266,6 +266,12 @@
mts20140618
5.0.0
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
diff --git a/src/main/java/com/ycwl/basic/config/KafkaConfig.java b/src/main/java/com/ycwl/basic/config/KafkaConfig.java
new file mode 100644
index 00000000..7dec01eb
--- /dev/null
+++ b/src/main/java/com/ycwl/basic/config/KafkaConfig.java
@@ -0,0 +1,83 @@
+package com.ycwl.basic.config;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true", matchIfMissing = false)
+public class KafkaConfig {
+
+ @Value("${kafka.bootstrap-servers:100.64.0.12:39092}")
+ private String bootstrapServers;
+
+ @Value("${kafka.consumer.group-id:liuying-microservice}")
+ private String consumerGroupId;
+
+ @Value("${kafka.consumer.auto-offset-reset:earliest}")
+ private String autoOffsetReset;
+
+ @Value("${kafka.producer.acks:all}")
+ private String acks;
+
+ @Value("${kafka.producer.retries:3}")
+ private Integer retries;
+
+ @Value("${kafka.producer.batch-size:16384}")
+ private Integer batchSize;
+
+ @Value("${kafka.producer.linger-ms:1}")
+ private Integer lingerMs;
+
+ @Value("${kafka.producer.buffer-memory:33554432}")
+ private Integer bufferMemory;
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.ACKS_CONFIG, acks);
+ configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
+ configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+ configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
+ configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ return factory;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java b/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java
new file mode 100644
index 00000000..cf4842aa
--- /dev/null
+++ b/src/main/java/com/ycwl/basic/integration/kafka/config/KafkaIntegrationProperties.java
@@ -0,0 +1,35 @@
+package com.ycwl.basic.integration.kafka.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+@ConfigurationProperties(prefix = "kafka")
+public class KafkaIntegrationProperties {
+
+ private boolean enabled = false;
+ private String bootstrapServers = "100.64.0.12:39092";
+ private Consumer consumer = new Consumer();
+ private Producer producer = new Producer();
+
+ @Data
+ public static class Consumer {
+ private String groupId = "liuying-microservice";
+ private String autoOffsetReset = "earliest";
+ private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+ private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+ }
+
+ @Data
+ public static class Producer {
+ private String acks = "all";
+ private Integer retries = 3;
+ private Integer batchSize = 16384;
+ private Integer lingerMs = 1;
+ private Integer bufferMemory = 33554432;
+ private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
+ private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/ycwl/basic/integration/kafka/dto/FaceProcessingMessage.java b/src/main/java/com/ycwl/basic/integration/kafka/dto/FaceProcessingMessage.java
new file mode 100644
index 00000000..78f542ec
--- /dev/null
+++ b/src/main/java/com/ycwl/basic/integration/kafka/dto/FaceProcessingMessage.java
@@ -0,0 +1,83 @@
+package com.ycwl.basic.integration.kafka.dto;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+/**
+ * zt-face topic消息结构
+ * 用于人脸处理任务的异步消息传递
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class FaceProcessingMessage {
+
+ /**
+ * 人脸样本ID(外部系统传入)
+ */
+ private Long faceSampleId;
+
+ /**
+ * 景区ID
+ */
+ private Long scenicId;
+
+ /**
+ * 设备ID
+ */
+ private Long deviceId;
+
+ /**
+ * 人脸图片URL
+ */
+ private String faceUrl;
+
+ // 不再需要faceUniqueId,直接使用faceSampleId作为唯一标识
+
+ /**
+ * 拍摄时间
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private LocalDateTime shotTime;
+
+ // status字段已移除,由系统内部管理状态
+
+ // deviceConfig字段已移除,由系统内部通过deviceRepository查询
+
+ /**
+ * 消息创建时间
+ */
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private LocalDateTime createTime;
+
+ /**
+ * 消息来源
+ */
+ private String source;
+
+ // retryCount字段已移除,重试机制由系统内部管理
+
+ // DeviceConfig内部类已移除,不再需要在消息中传递设备配置
+
+ /**
+ * 创建人脸处理消息的工厂方法(使用外部传入的faceId)
+ */
+ public static FaceProcessingMessage create(Long externalFaceId, Long scenicId, Long deviceId,
+ String faceUrl, LocalDateTime shotTime) {
+ return FaceProcessingMessage.builder()
+ .faceSampleId(externalFaceId) // 使用外部传入的ID作为唯一标识
+ .scenicId(scenicId)
+ .deviceId(deviceId)
+ .faceUrl(faceUrl)
+ .shotTime(shotTime)
+ .createTime(LocalDateTime.now())
+ .source("external-system")
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/ycwl/basic/integration/kafka/dto/KafkaMessage.java b/src/main/java/com/ycwl/basic/integration/kafka/dto/KafkaMessage.java
new file mode 100644
index 00000000..3e07716a
--- /dev/null
+++ b/src/main/java/com/ycwl/basic/integration/kafka/dto/KafkaMessage.java
@@ -0,0 +1,38 @@
+package com.ycwl.basic.integration.kafka.dto;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class KafkaMessage {
+
+ private String messageId;
+ private String topic;
+ private String eventType;
+ private T payload;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private LocalDateTime timestamp;
+
+ private String source;
+ private String version;
+
+ public static KafkaMessage of(String topic, String eventType, T payload) {
+ return KafkaMessage.builder()
+ .topic(topic)
+ .eventType(eventType)
+ .payload(payload)
+ .timestamp(LocalDateTime.now())
+ .source("liuying-microservice")
+ .version("1.0")
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/ycwl/basic/integration/kafka/example/KafkaIntegrationExample.java b/src/main/java/com/ycwl/basic/integration/kafka/example/KafkaIntegrationExample.java
new file mode 100644
index 00000000..72441c9b
--- /dev/null
+++ b/src/main/java/com/ycwl/basic/integration/kafka/example/KafkaIntegrationExample.java
@@ -0,0 +1,57 @@
+package com.ycwl.basic.integration.kafka.example;
+
+import com.ycwl.basic.integration.kafka.dto.KafkaMessage;
+import com.ycwl.basic.integration.kafka.service.KafkaIntegrationService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+/**
+ * Kafka集成使用示例(暂时注释,后续开发时启用)
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "kafka.example.enabled", havingValue = "true", matchIfMissing = false)
+public class KafkaIntegrationExample {
+
+ private final KafkaIntegrationService kafkaService;
+
+ /**
+ * 演示Kafka连接测试
+ */
+ public void demonstrateConnectionTest() {
+ log.info("=== Kafka Integration Example ===");
+
+ // 测试连接
+ boolean connected = kafkaService.testConnection();
+ log.info("Kafka connection status: {}", connected ? "SUCCESS" : "FAILED");
+
+ // 显示配置信息
+ var properties = kafkaService.getKafkaProperties();
+ log.info("Kafka Bootstrap Servers: {}", properties.getBootstrapServers());
+ log.info("Consumer Group ID: {}", properties.getConsumer().getGroupId());
+ }
+
+ /**
+ * 演示消息发送(预留示例)
+ */
+ public void demonstrateMessageSending() {
+ log.info("=== Message Sending Example (Not Implemented) ===");
+
+ // 创建示例消息
+ KafkaMessage message = KafkaMessage.of(
+ "test-topic",
+ "TEST_EVENT",
+ "Hello Kafka from liuying-microservice!"
+ );
+
+ // 发送消息(暂不实现)
+ kafkaService.sendMessage("test-topic", "test-key", message);
+ log.info("Message sending demonstration completed");
+ }
+
+ // TODO: 后续添加消费者示例
+ // public void demonstrateMessageConsuming() { ... }
+}
\ No newline at end of file
diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java
new file mode 100644
index 00000000..42d98542
--- /dev/null
+++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java
@@ -0,0 +1,186 @@
+package com.ycwl.basic.integration.kafka.service;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter;
+import com.ycwl.basic.facebody.entity.AddFaceResp;
+import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage;
+import com.ycwl.basic.mapper.FaceSampleMapper;
+import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
+import com.ycwl.basic.repository.DeviceRepository;
+import com.ycwl.basic.service.pc.ScenicService;
+import com.ycwl.basic.service.task.TaskFaceService;
+import com.ycwl.basic.task.DynamicTaskGenerator;
+import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
+// 不再需要SnowFlakeUtil,使用外部传入的ID
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+import java.time.ZoneId;
+import java.util.Date;
+
+/**
+ * 人脸处理Kafka消费服务
+ * 消费外部系统发送到zt-face topic的消息
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
+public class FaceProcessingKafkaService {
+
+ private static final String ZT_FACE_TOPIC = "zt-face";
+
+ private final ObjectMapper objectMapper;
+ private final FaceSampleMapper faceSampleMapper;
+ private final TaskFaceService taskFaceService;
+ private final ScenicService scenicService;
+ private final DeviceRepository deviceRepository;
+
+ /**
+ * 消费外部系统发送的人脸处理消息
+ * 先保存人脸样本数据,再进行异步人脸识别处理
+ */
+ @KafkaListener(topics = ZT_FACE_TOPIC, groupId = "face-processing-group")
+ public void processFaceMessage(String message) {
+ try {
+ FaceProcessingMessage faceMessage = objectMapper.readValue(message, FaceProcessingMessage.class);
+ log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
+ faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
+
+ // 使用外部传入的faceSampleId
+ Long externalFaceId = faceMessage.getFaceSampleId();
+ if (externalFaceId == null) {
+ log.error("外部消息中未包含faceSampleId");
+ return;
+ }
+
+ // 先保存人脸样本数据
+ boolean saved = saveFaceSample(faceMessage, externalFaceId);
+
+ // 然后进行人脸识别处理
+ if (saved) {
+ processFaceRecognition(faceMessage);
+ }
+
+ } catch (Exception e) {
+ log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
+ // TODO: 考虑错误重试机制或死信队列
+ }
+ }
+
+ /**
+ * 保存人脸样本数据到数据库
+ * @param faceMessage 人脸处理消息
+ * @param externalFaceId 外部传入的人脸ID
+ * @return 是否保存成功
+ */
+ private boolean saveFaceSample(FaceProcessingMessage faceMessage, Long externalFaceId) {
+ try {
+ FaceSampleEntity faceSample = new FaceSampleEntity();
+ faceSample.setId(externalFaceId); // 使用外部传入的ID
+ faceSample.setScenicId(faceMessage.getScenicId());
+ faceSample.setDeviceId(faceMessage.getDeviceId());
+ faceSample.setStatus(0); // 初始状态
+ faceSample.setFaceUrl(faceMessage.getFaceUrl());
+
+ // 转换时间格式
+ if (faceMessage.getShotTime() != null) {
+ Date shotTime = Date.from(faceMessage.getShotTime().atZone(ZoneId.systemDefault()).toInstant());
+ faceSample.setCreateAt(shotTime);
+ } else {
+ faceSample.setCreateAt(new Date());
+ }
+
+ // 保存到数据库
+ faceSampleMapper.add(faceSample);
+ log.info("人脸样本数据已保存, 使用外部faceId: {}, scenicId: {}, deviceId: {}, faceUrl: {}",
+ externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), faceMessage.getFaceUrl());
+
+ return true;
+ } catch (Exception e) {
+ log.error("保存人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}",
+ externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), e);
+ return false;
+ }
+ }
+
+ /**
+ * 执行人脸识别处理逻辑
+ * 对已保存的人脸样本进行识别处理
+ */
+ private void processFaceRecognition(FaceProcessingMessage message) {
+ Long faceSampleId = message.getFaceSampleId();
+ Long scenicId = message.getScenicId();
+ String faceUrl = message.getFaceUrl();
+
+ // 直接使用faceSampleId作为唯一标识
+ String faceUniqueId = faceSampleId.toString();
+
+ // 获取人脸识别适配器
+ IFaceBodyAdapter faceBodyAdapter = scenicService.getScenicFaceBodyAdapter(scenicId);
+ if (faceBodyAdapter == null) {
+ log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
+ updateFaceSampleStatus(faceSampleId, -1);
+ return;
+ }
+
+ try {
+ // 更新状态为处理中
+ updateFaceSampleStatus(faceSampleId, 1);
+
+ // 确保人脸数据库存在
+ taskFaceService.assureFaceDb(faceBodyAdapter, scenicId.toString());
+
+ // 添加人脸到识别服务(使用faceSampleId作为唯一标识)
+ AddFaceResp addFaceResp = faceBodyAdapter.addFace(
+ scenicId.toString(),
+ faceSampleId.toString(),
+ faceUrl,
+ faceUniqueId // 即faceSampleId.toString()
+ );
+
+ if (addFaceResp != null) {
+ // 更新人脸样本得分和状态
+ faceSampleMapper.updateScore(faceSampleId, addFaceResp.getScore());
+ updateFaceSampleStatus(faceSampleId, 2); // 成功状态
+ log.info("人脸识别处理完成, faceSampleId: {}, score: {}",
+ faceSampleId, addFaceResp.getScore());
+
+ // 查询设备配置,判断是否启用预订功能
+ Long deviceId = message.getDeviceId();
+ DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
+ if (deviceConfig != null &&
+ Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
+ DynamicTaskGenerator.addTask(faceSampleId);
+ log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId);
+ }
+ } else {
+ log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
+ updateFaceSampleStatus(faceSampleId, -1);
+ }
+
+ } catch (Exception e) {
+ log.error("人脸识别处理失败, faceSampleId: {}, error: {}",
+ faceSampleId, e.getMessage(), e);
+
+ // 标记人脸样本为处理失败状态
+ updateFaceSampleStatus(faceSampleId, -1);
+ }
+ }
+
+ /**
+ * 更新人脸样本状态
+ */
+ private void updateFaceSampleStatus(Long faceSampleId, Integer status) {
+ try {
+ // TODO: 需要在FaceSampleMapper中添加updateStatus方法
+ // faceSampleMapper.updateStatus(faceSampleId, status);
+ log.info("人脸样本状态已更新, faceSampleId: {}, status: {} (0:初始,1:处理中,2:成功,-1:失败)", faceSampleId, status);
+ } catch (Exception e) {
+ log.error("更新人脸样本状态失败, faceSampleId: {}", faceSampleId, e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/KafkaIntegrationService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/KafkaIntegrationService.java
new file mode 100644
index 00000000..9cb81b09
--- /dev/null
+++ b/src/main/java/com/ycwl/basic/integration/kafka/service/KafkaIntegrationService.java
@@ -0,0 +1,51 @@
+package com.ycwl.basic.integration.kafka.service;
+
+import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties;
+import com.ycwl.basic.integration.kafka.dto.KafkaMessage;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
+public class KafkaIntegrationService {
+
+ private final KafkaTemplate kafkaTemplate;
+ private final KafkaIntegrationProperties kafkaProperties;
+
+ /**
+ * 测试Kafka连接
+ */
+ public boolean testConnection() {
+ try {
+ log.info("Testing Kafka connection to: {}", kafkaProperties.getBootstrapServers());
+
+ // 尝试获取元数据以测试连接
+ var metadata = kafkaTemplate.getProducerFactory().createProducer().partitionsFor("test-topic");
+ log.info("Kafka connection test successful");
+ return true;
+ } catch (Exception e) {
+ log.error("Kafka connection test failed", e);
+ return false;
+ }
+ }
+
+ /**
+ * 发送消息(预留接口,暂不实现具体逻辑)
+ */
+ public void sendMessage(String topic, String key, KafkaMessage> message) {
+ log.info("Kafka message sending is not implemented yet. Topic: {}, Key: {}", topic, key);
+ // TODO: 后续实现具体的消息发送逻辑
+ }
+
+ /**
+ * 获取Kafka配置信息
+ */
+ public KafkaIntegrationProperties getKafkaProperties() {
+ return kafkaProperties;
+ }
+}
\ No newline at end of file
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 1ba11371..c0143d4c 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -17,6 +17,24 @@ feign:
okhttp:
enabled: true
+# Kafka配置
+kafka:
+ enabled: false # 默认关闭,需要时手动开启
+ bootstrap-servers: 100.64.0.12:39092
+ consumer:
+ group-id: liuying-microservice-dev
+ auto-offset-reset: earliest
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ producer:
+ acks: all
+ retries: 3
+ batch-size: 16384
+ linger-ms: 1
+ buffer-memory: 33554432
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+
# 开发环境日志配置
logging:
level: