feat(integration): 添加 Kafka 消息系统集成

- 新增 Kafka 配置和连接测试功能- 实现人脸处理消息的消费逻辑
- 添加消息发送预留接口
- 优化人脸样本保存和处理流程
This commit is contained in:
2025-09-12 06:38:44 +08:00
parent 39bd18497c
commit dc2154c020
9 changed files with 557 additions and 0 deletions

View File

@@ -0,0 +1,83 @@
package com.ycwl.basic.integration.kafka.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* zt-face topic消息结构
* 用于人脸处理任务的异步消息传递
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class FaceProcessingMessage {
/**
* 人脸样本ID(外部系统传入)
*/
private Long faceSampleId;
/**
* 景区ID
*/
private Long scenicId;
/**
* 设备ID
*/
private Long deviceId;
/**
* 人脸图片URL
*/
private String faceUrl;
// 不再需要faceUniqueId,直接使用faceSampleId作为唯一标识
/**
* 拍摄时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime shotTime;
// status字段已移除,由系统内部管理状态
// deviceConfig字段已移除,由系统内部通过deviceRepository查询
/**
* 消息创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
/**
* 消息来源
*/
private String source;
// retryCount字段已移除,重试机制由系统内部管理
// DeviceConfig内部类已移除,不再需要在消息中传递设备配置
/**
* 创建人脸处理消息的工厂方法(使用外部传入的faceId)
*/
public static FaceProcessingMessage create(Long externalFaceId, Long scenicId, Long deviceId,
String faceUrl, LocalDateTime shotTime) {
return FaceProcessingMessage.builder()
.faceSampleId(externalFaceId) // 使用外部传入的ID作为唯一标识
.scenicId(scenicId)
.deviceId(deviceId)
.faceUrl(faceUrl)
.shotTime(shotTime)
.createTime(LocalDateTime.now())
.source("external-system")
.build();
}
}

View File

@@ -0,0 +1,38 @@
package com.ycwl.basic.integration.kafka.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMessage<T> {
private String messageId;
private String topic;
private String eventType;
private T payload;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime timestamp;
private String source;
private String version;
public static <T> KafkaMessage<T> of(String topic, String eventType, T payload) {
return KafkaMessage.<T>builder()
.topic(topic)
.eventType(eventType)
.payload(payload)
.timestamp(LocalDateTime.now())
.source("liuying-microservice")
.version("1.0")
.build();
}
}