diff --git a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java index f6cb1fd7..1eca19c1 100644 --- a/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java +++ b/src/main/java/com/ycwl/basic/integration/kafka/service/FaceProcessingKafkaService.java @@ -10,6 +10,7 @@ import com.ycwl.basic.integration.common.manager.DeviceConfigManager; import com.ycwl.basic.integration.kafka.dto.FaceProcessingMessage; import com.ycwl.basic.integration.kafka.scheduler.AccountFaceSchedulerManager; import com.ycwl.basic.integration.kafka.scheduler.AccountSchedulerContext; +import com.ycwl.basic.mapper.FaceSampleAiCamMapper; import com.ycwl.basic.mapper.FaceSampleMapper; import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity; import com.ycwl.basic.repository.DeviceRepository; @@ -42,8 +43,10 @@ import java.util.Date; public class FaceProcessingKafkaService { private static final String ZT_FACE_TOPIC = "zt-face"; + private static final String ZT_AI_CAM_FACE_TOPIC = "zt-ai-cam-face"; private final FaceSampleMapper faceSampleMapper; + private final FaceSampleAiCamMapper faceSampleAiCamMapper; private final TaskFaceService taskFaceService; private final ScenicService scenicService; private final DeviceRepository deviceRepository; @@ -115,6 +118,67 @@ public class FaceProcessingKafkaService { } } + /** + * 消费AI相机发送的人脸处理消息 (zt-ai-cam-face) + * 逻辑与 zt-face 类似,但写入不同的表,且人脸库分组依据不同 + */ + @KafkaListener(topics = ZT_AI_CAM_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory") + public void processAiCamFaceMessage(String message, Acknowledgment ack) { + Long faceSampleId = null; + try { + FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class); + faceSampleId = faceMessage.getFaceSampleId(); + + log.debug("接收AI相机人脸消息: scenicId={}, deviceId={}, faceSampleId={}", + faceMessage.getScenicId(), faceMessage.getDeviceId(), faceSampleId); + + // ========== 第一步: 同步写入数据库 (FaceSampleAiCam) ========== + boolean saved = saveAiCamFaceSample(faceMessage, faceSampleId); + if (!saved) { + log.error("AI相机数据库写入失败, 不提交识别任务, faceSampleId={}", faceSampleId); + ack.acknowledge(); + return; + } + + log.debug("AI相机数据库写入成功, faceSampleId={}, status=0", faceSampleId); + + // ========== 第二步: 获取账号调度器上下文 ========== + AccountSchedulerContext schedulerCtx = getSchedulerContextForScenic(faceMessage.getScenicId()); + if (schedulerCtx == null) { + log.error("无法获取调度器上下文, faceSampleId={}", faceSampleId); + updateAiCamFaceSampleStatusSafely(faceSampleId, -1); + ack.acknowledge(); + return; + } + + // ========== 第三步: 提交到账号专属调度器 ========== + boolean submitted = schedulerCtx.getScheduler().submit(() -> { + processAiCamFaceRecognitionAsync(faceMessage); + }); + + if (submitted) { + log.debug("AI相机任务已提交到调度器, account={}, cloudType={}, faceSampleId={}, schedulerQueue={}", + schedulerCtx.getAccountKey(), + schedulerCtx.getCloudType(), + faceSampleId, + schedulerCtx.getScheduler().getQueueSize()); + } else { + log.error("调度器队列已满, account={}, faceSampleId={}", + schedulerCtx.getAccountKey(), faceSampleId); + updateAiCamFaceSampleStatusSafely(faceSampleId, -1); + } + + ack.acknowledge(); + + } catch (Exception e) { + log.error("处理AI相机人脸消息异常, faceSampleId={}", faceSampleId, e); + if (faceSampleId != null) { + updateAiCamFaceSampleStatusSafely(faceSampleId, -1); + } + ack.acknowledge(); + } + } + /** * 根据景区获取对应的账号调度器上下文 * 关键: 按 accessKeyId/appId 隔离,而非按云类型 @@ -343,4 +407,105 @@ public class FaceProcessingKafkaService { return false; } } + + /** + * 安全地更新AI相机人脸样本状态 + */ + private void updateAiCamFaceSampleStatusSafely(Long faceSampleId, Integer status) { + if (faceSampleId == null) { + return; + } + try { + faceSampleAiCamMapper.updateStatus(faceSampleId, status); + log.debug("AI相机样本状态更新成功: faceSampleId={}, status={}", faceSampleId, status); + } catch (Exception e) { + log.error("AI相机样本状态更新失败(非致命): faceSampleId={}, status={}", faceSampleId, status, e); + } + } + + /** + * 保存AI相机人脸样本数据到数据库 + */ + private boolean saveAiCamFaceSample(FaceProcessingMessage faceMessage, Long externalFaceId) { + try { + FaceSampleEntity faceSample = new FaceSampleEntity(); + faceSample.setId(externalFaceId); // 使用外部传入的ID + faceSample.setScenicId(faceMessage.getScenicId()); + faceSample.setDeviceId(faceMessage.getDeviceId()); + faceSample.setStatus(0); // 初始状态 + faceSample.setFaceUrl(faceMessage.getFaceUrl()); + + if (faceMessage.getShotTime() != null) { + faceSample.setCreateAt(faceMessage.getShotTime()); + } else { + faceSample.setCreateAt(new Date()); + } + + faceSampleAiCamMapper.add(faceSample); + + return true; + } catch (Exception e) { + log.error("保存AI相机人脸样本数据失败, 外部faceId: {}, scenicId: {}, deviceId: {}", + externalFaceId, faceMessage.getScenicId(), faceMessage.getDeviceId(), e); + return false; + } + } + + /** + * 异步执行AI相机人脸识别处理逻辑 + * 区别: 使用 deviceId 作为人脸库分组 + */ + private void processAiCamFaceRecognitionAsync(FaceProcessingMessage message) { + Long faceSampleId = message.getFaceSampleId(); + Long scenicId = message.getScenicId(); + Long deviceId = message.getDeviceId(); + + try { + updateAiCamFaceSampleStatusSafely(faceSampleId, 1); + log.debug("开始AI相机人脸识别, faceSampleId={}, status=1", faceSampleId); + + IFaceBodyAdapter adapter = scenicService.getScenicFaceBodyAdapter(scenicId); + if (adapter == null) { + log.error("adapter 不存在, scenicId={}, faceSampleId={}", scenicId, faceSampleId); + updateAiCamFaceSampleStatusSafely(faceSampleId, -1); + return; + } + + // 使用 "ai-cam-" + deviceId 作为人脸库分组 + String dbName = "ai-cam-" + deviceId; + taskFaceService.assureFaceDb(adapter, dbName); + + String faceUniqueId = faceSampleId.toString(); + // groupName 使用 deviceId + AddFaceResp addFaceResp = adapter.addFace( + dbName, + faceSampleId.toString(), + message.getFaceUrl(), + faceUniqueId + ); + + if (addFaceResp != null) { + faceSampleAiCamMapper.updateScore(faceSampleId, addFaceResp.getScore()); + updateAiCamFaceSampleStatusSafely(faceSampleId, 2); + + log.info("AI相机人脸识别成功, faceSampleId={}, score={}, status=2", + faceSampleId, addFaceResp.getScore()); + + // 预订任务逻辑与原逻辑保持一致 (如果需要) + DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId); + if (deviceConfig != null && + Integer.valueOf(1).equals(deviceConfig.getInteger("enable_pre_book"))) { + DynamicTaskGenerator.addTask(faceSampleId); + } + + } else { + log.warn("addFace 返回 null, faceSampleId={}", faceSampleId); + updateAiCamFaceSampleStatusSafely(faceSampleId, -1); + } + + } catch (Exception e) { + log.error("AI相机人脸识别异常, faceSampleId={}", faceSampleId, e); + updateAiCamFaceSampleStatusSafely(faceSampleId, -1); + } + } } \ No newline at end of file diff --git a/src/main/java/com/ycwl/basic/mapper/FaceSampleAiCamMapper.java b/src/main/java/com/ycwl/basic/mapper/FaceSampleAiCamMapper.java new file mode 100644 index 00000000..97bceebe --- /dev/null +++ b/src/main/java/com/ycwl/basic/mapper/FaceSampleAiCamMapper.java @@ -0,0 +1,32 @@ +package com.ycwl.basic.mapper; + +import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity; +import com.ycwl.basic.model.pc.faceSample.req.FaceSampleReqQuery; +import com.ycwl.basic.model.pc.faceSample.resp.FaceSampleRespVO; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; +import java.util.List; + +/** + * AiCam人脸样本Mapper + */ +@Mapper +public interface FaceSampleAiCamMapper { + List list(FaceSampleReqQuery faceSampleReqQuery); + FaceSampleRespVO getById(Long id); + int add(FaceSampleEntity faceSample); + int deleteById(Long id); + int deleteByIds(@Param("list") List ids); + int update(FaceSampleEntity faceSample); + + List listByIds(List list); + + FaceSampleEntity getEntity(Long faceSampleId); + List listEntityBeforeDate(Long scenicId, Date endDate); + + void updateScore(Long id, Float score); + + void updateStatus(Long id, Integer status); +} diff --git a/src/main/resources/mapper/FaceSampleAiCamMapper.xml b/src/main/resources/mapper/FaceSampleAiCamMapper.xml new file mode 100644 index 00000000..a4831e32 --- /dev/null +++ b/src/main/resources/mapper/FaceSampleAiCamMapper.xml @@ -0,0 +1,111 @@ + + + + + insert into face_sample_ai_cam(id, scenic_id, device_id, face_url, match_sample_ids, first_match_rate, match_result,`status`, create_at) + values (#{id}, #{scenicId}, #{deviceId}, #{faceUrl}, #{matchSampleIds}, #{firstMatchRate}, #{matchResult},#{status},#{createAt}) + + + update face_sample_ai_cam + + + scenic_id = #{scenicId}, + + + device_id = #{deviceId}, + + + face_url = #{faceUrl}, + + + match_sample_ids = #{matchSampleIds}, + + + first_match_rate = #{firstMatchRate}, + + + match_result = #{matchResult}, + + + `status` = #{status}, + + + `score` = #{score}, + + update_at = now(), + + where id = #{id} + + + update face_sample_ai_cam + set score = #{score} + where id = #{id} + + + update face_sample_ai_cam + set `status` = #{status} + where id = #{id} + + + delete from face_sample_ai_cam where id = #{id} + + + + delete from face_sample_ai_cam where id in ( + + #{id} + + ) + + + + + + + +