feat(kafka): 新增AI相机人脸处理消息消费逻辑

- 新增AI相机专用Kafka主题(zt-ai-cam-face)监听
- 新增FaceSampleAiCamMapper及对应XML映射文件
- 实现AI相机人脸数据入库及状态更新逻辑
- 实现基于设备ID的人脸库分组策略
- 添加异步人脸识别处理及评分更新功能
- 增加预订单任务触发机制
- 补充安全的状态更新与异常处理机制
This commit is contained in:
2025-12-05 11:26:38 +08:00
parent 7a19f18962
commit e27ed7d971
3 changed files with 308 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage;
import com.ycwl.basic.integration.kafka.scheduler.AccountFaceSchedulerManager;
import com.ycwl.basic.integration.kafka.scheduler.AccountSchedulerContext;
import com.ycwl.basic.mapper.FaceSampleAiCamMapper;
import com.ycwl.basic.mapper.FaceSampleMapper;
import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
import com.ycwl.basic.repository.DeviceRepository;
@@ -42,8 +43,10 @@ import java.util.Date;
public class FaceProcessingKafkaService {
private static final String ZT_FACE_TOPIC = "zt-face";
private static final String ZT_AI_CAM_FACE_TOPIC = "zt-ai-cam-face";
private final FaceSampleMapper faceSampleMapper;
private final FaceSampleAiCamMapper faceSampleAiCamMapper;
private final TaskFaceService taskFaceService;
private final ScenicService scenicService;
private final DeviceRepository deviceRepository;
@@ -115,6 +118,67 @@ public class FaceProcessingKafkaService {
}
}
/**
* 消费AI相机发送的人脸处理消息 (zt-ai-cam-face)
* 逻辑与 zt-face 类似,但写入不同的表,且人脸库分组依据不同
*/
@KafkaListener(topics = ZT_AI_CAM_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory")
public void processAiCamFaceMessage(String message, Acknowledgment ack) {
Long faceSampleId = null;
try {
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
faceSampleId = faceMessage.getFaceSampleId();
log.debug("接收AI相机人脸消息: scenicId={}, deviceId={}, faceSampleId={}",
faceMessage.getScenicId(), faceMessage.getDeviceId(), faceSampleId);
// ========== 第一步: 同步写入数据库 (FaceSampleAiCam) ==========
boolean saved = saveAiCamFaceSample(faceMessage, faceSampleId);
if (!saved) {
log.error("AI相机数据库写入失败, 不提交识别任务, faceSampleId={}", faceSampleId);
ack.acknowledge();
return;
}
log.debug("AI相机数据库写入成功, faceSampleId={}, status=0", faceSampleId);
// ========== 第二步: 获取账号调度器上下文 ==========
AccountSchedulerContext schedulerCtx = getSchedulerContextForScenic(faceMessage.getScenicId());
if (schedulerCtx == null) {
log.error("无法获取调度器上下文, faceSampleId={}", faceSampleId);
updateAiCamFaceSampleStatusSafely(faceSampleId, -1);
ack.acknowledge();
return;
}
// ========== 第三步: 提交到账号专属调度器 ==========
boolean submitted = schedulerCtx.getScheduler().submit(() -> {
processAiCamFaceRecognitionAsync(faceMessage);
});
if (submitted) {
log.debug("AI相机任务已提交到调度器, account={}, cloudType={}, faceSampleId={}, schedulerQueue={}",
schedulerCtx.getAccountKey(),
schedulerCtx.getCloudType(),
faceSampleId,
schedulerCtx.getScheduler().getQueueSize());
} else {
log.error("调度器队列已满, account={}, faceSampleId={}",
schedulerCtx.getAccountKey(), faceSampleId);
updateAiCamFaceSampleStatusSafely(faceSampleId, -1);
}
ack.acknowledge();
} catch (Exception e) {
log.error("处理AI相机人脸消息异常, faceSampleId={}", faceSampleId, e);
if (faceSampleId != null) {
updateAiCamFaceSampleStatusSafely(faceSampleId, -1);
}
ack.acknowledge();
}
}
/**
* 根据景区获取对应的账号调度器上下文
* 关键: 按 accessKeyId/appId 隔离,而非按云类型
@@ -343,4 +407,105 @@ public class FaceProcessingKafkaService {
return false;
}
}
/**
* 安全地更新AI相机人脸样本状态
*/
private void updateAiCamFaceSampleStatusSafely(Long faceSampleId, Integer status) {
if (faceSampleId == null) {
return;
}
try {
faceSampleAiCamMapper.updateStatus(faceSampleId, status);
log.debug("AI相机样本状态更新成功: faceSampleId={}, status={}", faceSampleId, status);
} catch (Exception e) {
log.error("AI相机样本状态更新失败(非致命): faceSampleId={}, status={}", faceSampleId, status, e);
}
}
/**
* 保存AI相机人脸样本数据到数据库
*/
private boolean saveAiCamFaceSample(FaceProcessingMessage faceMessage, Long externalFaceId) {
try {
FaceSampleEntity faceSample = new FaceSampleEntity();
faceSample.setId(externalFaceId); // 使用外部传入的ID
faceSample.setScenicId(faceMessage.getScenicId());
faceSample.setDeviceId(faceMessage.getDeviceId());
faceSample.setStatus(0); // 初始状态
faceSample.setFaceUrl(faceMessage.getFaceUrl());
if (faceMessage.getShotTime() != null) {
faceSample.setCreateAt(faceMessage.getShotTime());
} else {
faceSample.setCreateAt(new Date());
}
faceSampleAiCamMapper.add(faceSample);
return true;
} catch (Exception e) {
log.error("保存AI相机人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}",
externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), e);
return false;
}
}
/**
* 异步执行AI相机人脸识别处理逻辑
* 区别: 使用 deviceId 作为人脸库分组
*/
private void processAiCamFaceRecognitionAsync(FaceProcessingMessage message) {
Long faceSampleId = message.getFaceSampleId();
Long scenicId = message.getScenicId();
Long deviceId = message.getDeviceId();
try {
updateAiCamFaceSampleStatusSafely(faceSampleId, 1);
log.debug("开始AI相机人脸识别, faceSampleId={}, status=1", faceSampleId);
IFaceBodyAdapter adapter = scenicService.getScenicFaceBodyAdapter(scenicId);
if (adapter == null) {
log.error("adapter 不存在, scenicId={}, faceSampleId={}", scenicId, faceSampleId);
updateAiCamFaceSampleStatusSafely(faceSampleId, -1);
return;
}
// 使用 "ai-cam-" + deviceId 作为人脸库分组
String dbName = "ai-cam-" + deviceId;
taskFaceService.assureFaceDb(adapter, dbName);
String faceUniqueId = faceSampleId.toString();
// groupName 使用 deviceId
AddFaceResp addFaceResp = adapter.addFace(
dbName,
faceSampleId.toString(),
message.getFaceUrl(),
faceUniqueId
);
if (addFaceResp != null) {
faceSampleAiCamMapper.updateScore(faceSampleId, addFaceResp.getScore());
updateAiCamFaceSampleStatusSafely(faceSampleId, 2);
log.info("AI相机人脸识别成功, faceSampleId={}, score={}, status=2",
faceSampleId, addFaceResp.getScore());
// 预订任务逻辑与原逻辑保持一致 (如果需要)
DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId);
if (deviceConfig != null &&
Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) {
DynamicTaskGenerator.addTask(faceSampleId);
}
} else {
log.warn("addFace 返回 null, faceSampleId={}", faceSampleId);
updateAiCamFaceSampleStatusSafely(faceSampleId, -1);
}
} catch (Exception e) {
log.error("AI相机人脸识别异常, faceSampleId={}", faceSampleId, e);
updateAiCamFaceSampleStatusSafely(faceSampleId, -1);
}
}
}

View File

@@ -0,0 +1,32 @@
package com.ycwl.basic.mapper;
import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
import com.ycwl.basic.model.pc.faceSample.req.FaceSampleReqQuery;
import com.ycwl.basic.model.pc.faceSample.resp.FaceSampleRespVO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
/**
* AiCam人脸样本Mapper
*/
@Mapper
public interface FaceSampleAiCamMapper {
List<FaceSampleRespVO> list(FaceSampleReqQuery faceSampleReqQuery);
FaceSampleRespVO getById(Long id);
int add(FaceSampleEntity faceSample);
int deleteById(Long id);
int deleteByIds(@Param("list") List<Long> ids);
int update(FaceSampleEntity faceSample);
List<FaceSampleEntity> listByIds(List<Long> list);
FaceSampleEntity getEntity(Long faceSampleId);
List<FaceSampleEntity> listEntityBeforeDate(Long scenicId, Date endDate);
void updateScore(Long id, Float score);
void updateStatus(Long id, Integer status);
}

View File

@@ -0,0 +1,111 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ycwl.basic.mapper.FaceSampleAiCamMapper">
<insert id="add">
insert into face_sample_ai_cam(id, scenic_id, device_id, face_url, match_sample_ids, first_match_rate, match_result,`status`, create_at)
values (#{id}, #{scenicId}, #{deviceId}, #{faceUrl}, #{matchSampleIds}, #{firstMatchRate}, #{matchResult},#{status},#{createAt})
</insert>
<update id="update">
update face_sample_ai_cam
<set>
<if test="scenicId!= null ">
scenic_id = #{scenicId},
</if>
<if test="deviceId!= null ">
device_id = #{deviceId},
</if>
<if test="faceUrl!= null and faceUrl!= ''">
face_url = #{faceUrl},
</if>
<if test="matchSampleIds!= null and matchSampleIds!= ''">
match_sample_ids = #{matchSampleIds},
</if>
<if test="firstMatchRate!= null ">
first_match_rate = #{firstMatchRate},
</if>
<if test="matchResult!= null and matchResult!= ''">
match_result = #{matchResult},
</if>
<if test="status!= null ">
`status` = #{status},
</if>
<if test="score!= null ">
`score` = #{score},
</if>
update_at = now(),
</set>
where id = #{id}
</update>
<update id="updateScore">
update face_sample_ai_cam
set score = #{score}
where id = #{id}
</update>
<update id="updateStatus">
update face_sample_ai_cam
set `status` = #{status}
where id = #{id}
</update>
<delete id="deleteById">
delete from face_sample_ai_cam where id = #{id}
</delete>
<delete id="deleteByIds">
<if test="list!= null and list.size() > 0">
delete from face_sample_ai_cam where id in (
<foreach collection="list" item="id" separator=",">
#{id}
</foreach>
)
</if>
</delete>
<select id="list" resultType="com.ycwl.basic.model.pc.faceSample.resp.FaceSampleRespVO">
select f.id, f.scenic_id, device_id, face_url, f.score, match_sample_ids, first_match_rate, match_result, f.`status`, f.create_at
from face_sample_ai_cam f
<where>
<if test="scenicId!= null and scenicId!= ''">
and f.scenic_id = #{scenicId}
</if>
<if test="deviceId!= null and deviceId!= ''">
and device_id = #{deviceId}
</if>
<if test="matchSampleIds!= null and matchSampleIds!= ''">
and match_sample_ids like concat('%', #{matchSampleIds}, '%')
</if>
<if test="startTime!=null">
and f.create_at >= #{startTime}
</if>
<if test="endTime!=null">
and f.create_at &lt;= #{endTime}
</if>
<if test="status!= null ">
and f.`status` = #{status}
</if>
</where>
ORDER BY f.create_at desc
</select>
<select id="getById" resultType="com.ycwl.basic.model.pc.faceSample.resp.FaceSampleRespVO">
select id, scenic_id, device_id, face_url, match_sample_ids, first_match_rate, match_result,`status`, create_at
from face_sample_ai_cam
where id = #{id}
</select>
<select id="listByIds" resultType="com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity">
select *
from face_sample_ai_cam
where id in (
<foreach collection="list" item="id" separator=",">
#{id}
</foreach>
)
order by create_at desc
</select>
<select id="getEntity" resultType="com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity">
select *
from face_sample_ai_cam
where id = #{id}
</select>
<select id="listEntityBeforeDate" resultType="com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity">
select *
from face_sample_ai_cam
where scenic_id = #{scenicId} and create_at &lt;= #{endDate}
</select>
</mapper>