Compare commits

...

4 Commits

Author SHA1 Message Date
841c89af04 refactor(task): 移除视频生成通知的Redis缓存逻辑
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 删除了检查Redis中memberId是否在3分钟内已发送过通知的代码
- 移除了发送成功后设置Redis缓存的逻辑
- 简化了通知发送流程,直接发送消息而不进行缓存控制- 更新了日志记录,移除了缓存相关的调试信息- 保留了核心的通知发送功能和必要的业务逻辑
2025-10-24 16:22:20 +08:00
bed3a4e3c9 feat(printer): 实现任务状态的原子性更新与同步锁机制
- 添加 compareAndSetTaskStatus 方法以支持基于期望状态的任务更新
- 引入 ReentrantLock 实现任务同步处理,防止并发冲突
- 在 XML 映射文件中定义 compareAndSetTaskStatus 的 SQL 更新语句
- 定义任务状态常量:TASK_STATUS_PENDING 和 TASK_STATUS_PROCESSING
- 优化任务获取逻辑,确保任务状态在处理前正确更新为 PROCESSING
2025-10-23 21:24:58 +08:00
3f8b911e6f feat(face): 增加自定义人脸匹配次数限制与记录功能
- 新增常量 FACE_CUSTOM_MATCH_COUNT_PFX 用于记录自定义匹配次数
- 在人脸识别逻辑中增加对自定义匹配次数的读取与限制判断
- 实现 recordCustomMatchCount 方法用于记录自定义匹配调用次数
- 优化原有识别次数获取逻辑,避免重复代码
- 增加 Redis 过期时间设置,确保计数数据自动清理
2025-10-23 18:17:07 +08:00
ea4adcdeb7 feat(kafka): 移除Kafka集成服务
- 删除了KafkaIntegrationService类及其相关逻辑
- 移除了Kafka消息发送和连接测试功能
- 清理了未实现的预留接口方法
- 移除了相关的配置属性获取方法
2025-10-23 16:11:57 +08:00
7 changed files with 94 additions and 81 deletions

View File

@@ -5,5 +5,6 @@ public class FaceConstant {
public static final String USER_FACE_DB_NAME="userFace";
public static final String FACE_USER_URL_PFX="face:user:url:";
public static final String FACE_RECOGNITION_COUNT_PFX="face:recognition:count:";
public static final String FACE_CUSTOM_MATCH_COUNT_PFX="face:custom:match:count:";
public static final String FACE_LOW_THRESHOLD_PFX="face:low:threshold:";
}

View File

@@ -1,51 +0,0 @@
package com.ycwl.basic.integration.kafka.service;
import com.ycwl.basic.integration.kafka.config.KafkaIntegrationProperties;
import com.ycwl.basic.integration.kafka.dto.KafkaMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
public class KafkaIntegrationService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final KafkaIntegrationProperties kafkaProperties;
/**
* 测试Kafka连接
*/
public boolean testConnection() {
try {
log.info("Testing Kafka connection to: {}", kafkaProperties.getBootstrapServers());
// 尝试获取元数据以测试连接
var metadata = kafkaTemplate.getProducerFactory().createProducer().partitionsFor("test-topic");
log.info("Kafka connection test successful");
return true;
} catch (Exception e) {
log.error("Kafka connection test failed", e);
return false;
}
}
/**
* 发送消息(预留接口,暂不实现具体逻辑)
*/
public void sendMessage(String topic, String key, KafkaMessage<?> message) {
log.info("Kafka message sending is not implemented yet. Topic: {}, Key: {}", topic, key);
// TODO: 后续实现具体的消息发送逻辑
}
/**
* 获取Kafka配置信息
*/
public KafkaIntegrationProperties getKafkaProperties() {
return kafkaProperties;
}
}

View File

@@ -28,6 +28,10 @@ public interface PrinterMapper {
int updateTaskStatus(@Param("id") Integer id, @Param("status") Integer status);
int compareAndSetTaskStatus(@Param("id") Integer id,
@Param("expectStatus") Integer expectStatus,
@Param("newStatus") Integer newStatus);
PrintTaskEntity getTaskById(Integer id);
List<PrinterResp> listByScenicId(@Param("scenicId") Long scenicId);
@@ -51,4 +55,4 @@ public interface PrinterMapper {
void updateUserPhotoListToPrinter(Long memberId, Long scenicId, Integer printerId);
List<MemberPrintResp> listRelationByOrderId(Long orderId);
}
}

View File

@@ -83,6 +83,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.ycwl.basic.constant.FaceConstant.FACE_CUSTOM_MATCH_COUNT_PFX;
import static com.ycwl.basic.constant.FaceConstant.FACE_LOW_THRESHOLD_PFX;
import static com.ycwl.basic.constant.FaceConstant.FACE_RECOGNITION_COUNT_PFX;
import static com.ycwl.basic.constant.FaceConstant.USER_FACE_DB_NAME;
@@ -987,6 +988,36 @@ public class FaceServiceImpl implements FaceService {
return false;
}
ScenicConfigManager scenicConfig = scenicRepository.getScenicConfigManager(face.getScenicId());
String recognitionKey = FACE_RECOGNITION_COUNT_PFX + faceId;
String recognitionCountStr = redisTemplate.opsForValue().get(recognitionKey);
long recognitionCount = 0L;
if (recognitionCountStr != null) {
try {
recognitionCount = Long.parseLong(recognitionCountStr);
} catch (NumberFormatException e) {
log.warn("识别次数解析失败,faceId={}, count={}", faceId, recognitionCountStr);
}
}
String customMatchKey = FACE_CUSTOM_MATCH_COUNT_PFX + faceId;
String customMatchCountStr = redisTemplate.opsForValue().get(customMatchKey);
long customMatchCount = 0L;
if (customMatchCountStr != null) {
try {
customMatchCount = Long.parseLong(customMatchCountStr);
} catch (NumberFormatException e) {
log.warn("自定义匹配次数解析失败,faceId={}, count={}", faceId, customMatchCountStr);
}
}
Integer faceSelectMaxCount = scenicConfig.getInteger("face_select_max_count");
if (faceSelectMaxCount != null && faceSelectMaxCount > 0 && customMatchCount > faceSelectMaxCount) {
log.debug("自定义人脸匹配次数超过限制:faceId={}, customMatchCount={}, limit={}",
faceId, customMatchCount, faceSelectMaxCount);
return false;
}
Integer maxTourTime = scenicConfig.getInteger("tour_time");
Integer minTourTime = scenicConfig.getInteger("tour_min_time");
boolean tourMatch = false;
@@ -1015,16 +1046,6 @@ public class FaceServiceImpl implements FaceService {
}
}
}
String countKey = FACE_RECOGNITION_COUNT_PFX + faceId;
String countStr = redisTemplate.opsForValue().get(countKey);
long recognitionCount = 0L;
if (countStr != null) {
try {
recognitionCount = Long.parseLong(countStr);
} catch (NumberFormatException e) {
log.warn("识别次数解析失败,faceId={}, count={}", faceId, countStr);
}
}
int ruleMatched = 0;
if (recognitionCount > 1) {
ruleMatched++;
@@ -1086,6 +1107,9 @@ public class FaceServiceImpl implements FaceService {
}
log.debug("开始自定义人脸匹配:faceId={}, faceSampleIds={}", faceId, faceSampleIds);
// 记录自定义匹配调用次数,便于监控调用频率
recordCustomMatchCount(faceId);
try {
// 1. 获取基础数据
@@ -1234,6 +1258,28 @@ public class FaceServiceImpl implements FaceService {
return mergedResult;
}
/**
* 记录自定义人脸匹配次数到Redis
*
* @param faceId 人脸ID
*/
private void recordCustomMatchCount(Long faceId) {
if (faceId == null) {
return;
}
try {
String redisKey = FACE_CUSTOM_MATCH_COUNT_PFX + faceId;
Long count = redisTemplate.opsForValue().increment(redisKey);
redisTemplate.expire(redisKey, 2, TimeUnit.DAYS);
log.debug("自定义人脸匹配计数更新:faceId={}, count={}", faceId, count);
} catch (Exception e) {
log.error("记录自定义人脸匹配次数失败:faceId={}", faceId, e);
}
}
/**
* 记录人脸识别次数到Redis
*

View File

@@ -69,6 +69,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Slf4j
@@ -151,11 +153,26 @@ public class PrinterServiceImpl implements PrinterService {
printer.setPrinters(printersStr);
printerMapper.update(printer);
}
PrintTaskResp task = printerMapper.findTaskByPrinterId(printer.getId());
if (task == null) {
return Collections.emptyList();
syncTaskLock.lock();
try {
while (true) {
PrintTaskResp task = printerMapper.findTaskByPrinterId(printer.getId());
if (task == null) {
return Collections.emptyList();
}
int updatedRows = printerMapper.compareAndSetTaskStatus(
task.getId(),
TASK_STATUS_PENDING,
TASK_STATUS_PROCESSING
);
if (updatedRows == 1) {
task.setStatus(TASK_STATUS_PROCESSING);
return Collections.singletonList(task);
}
}
} finally {
syncTaskLock.unlock();
}
return Collections.singletonList(task);
}
@Override
@@ -427,6 +444,9 @@ public class PrinterServiceImpl implements PrinterService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String USER_PHOTO_LIST_TO_PRINTER = "USER_PHOTO_LIST_TO_PRINTER:";
private static final int TASK_STATUS_PENDING = 0;
private static final int TASK_STATUS_PROCESSING = 3;
private final Lock syncTaskLock = new ReentrantLock();
@Override
public void setUserIsBuyItem(Long memberId, Long id, Long orderId) {
@@ -550,4 +570,4 @@ public class PrinterServiceImpl implements PrinterService {
printTaskMapper.insertTask(task);
});
}
}
}

View File

@@ -626,15 +626,6 @@ public class TaskTaskServiceImpl implements TaskService {
@Override
public void sendVideoGeneratedServiceNotification(Long taskId, Long memberId) {
// 检查Redis中该memberId是否在3分钟内已发送过通知
String notificationCacheKey = String.format(VIDEO_NOTIFICATION_CACHE_KEY, memberId);
String cachedValue = redisTemplate.opsForValue().get(notificationCacheKey);
if (cachedValue != null) {
log.info("memberId:{} 在3分钟内已发送过通知,跳过本次发送", memberId);
return;
}
MemberVideoEntity item = videoMapper.queryRelationByMemberTask(memberId, taskId);
MemberRespVO member = memberMapper.getById(memberId);
String openId = member.getOpenId();
@@ -694,10 +685,7 @@ public class TaskTaskServiceImpl implements TaskService {
msg.setSendReason("视频生成通知");
msg.setSendBiz("视频生成");
ztMessageProducerService.send(msg);
// 发送成功后,设置Redis缓存,2分钟过期
redisTemplate.opsForValue().set(notificationCacheKey, String.valueOf(System.currentTimeMillis()), NOTIFICATION_CACHE_EXPIRE_MINUTES, TimeUnit.MINUTES);
log.debug("memberId:{} 通知发送成功,已设置{}分钟缓存", memberId, NOTIFICATION_CACHE_EXPIRE_MINUTES);
log.info("memberId:{} 视频生成通知发送成功", memberId);
}
}

View File

@@ -152,6 +152,11 @@
<update id="updateTaskStatus">
UPDATE print_task SET status = #{status}, update_time = NOW() WHERE id = #{id}
</update>
<update id="compareAndSetTaskStatus">
UPDATE print_task
SET status = #{newStatus}, update_time = NOW()
WHERE id = #{id} AND status = #{expectStatus}
</update>
<update id="setPhotoCropped">
UPDATE member_print SET crop_url = #{url}, update_time = NOW(), print_url = null WHERE id = #{id}
</update>
@@ -172,4 +177,4 @@
<delete id="deleteUserPhoto">
DELETE FROM member_print WHERE member_id = #{memberId} AND scenic_id = #{scenicId} AND id = #{relationId} LIMIT 1;
</delete>
</mapper>
</mapper>