Compare commits

..

5 Commits

Author SHA1 Message Date
4f1443a3ca fix(video): 处理空imgSource情况- 添加空值检查以避免保存空source记录
All checks were successful
ZhenTu-BE/pipeline/head This commit looks good
- 记录警告日志当imgSource为空时- 返回false以跳过无效处理流程
2025-09-26 12:39:22 +08:00
aba9fb0a15 feat(printer): 添加用户购买项设置的Redis缓存控制
- 引入RedisTemplate依赖用于缓存控制
- 新增60秒的缓存键避免重复处理用户购买项
- 在setUserIsBuyItem方法中实现缓存检查逻辑- 添加TimeUnit依赖支持缓存过期时间设置
- 定义USER_PHOTO_LIST_TO_PRINTER缓存键前缀
2025-09-26 12:39:17 +08:00
ab3208c9df feat(kafka): 添加手动提交模式支持以增强消息处理可靠性
- 在 KafkaConfig 中新增 manualCommitKafkaListenerContainerFactory 配置
- 启用手动提交模式并设置 AckMode 为 MANUAL_IMMEDIATE
- 修改 FaceProcessingKafkaService 使用新的容器工厂- 添加 Acknowledgment 参数以控制消息提交时机
-仅在人脸样本保存和识别全部成功后才手动确认消息
- 处理失败时不再调用 ack.acknowledge()使消息可重新消费
- 更新 processFaceRecognition 方法返回处理结果状态
- 增强异常处理逻辑,确保失败情况下不提交消息
2025-09-25 18:46:15 +08:00
09e376e089 refactor(kafka): 统一时人脸消息时间类型为Date
- 将FaceProcessingMessage中的LocalDateTime替换为Date类型- 更新消息创建工厂方法以使用Date参数
- 调整Kafka服务中时间转换逻辑以匹配新类型
- 移除LocalDateTime相关的导入和引用- 更新字段注释以反映新的时间类型
2025-09-25 18:09:17 +08:00
dad9ddc17c docs 2025-09-25 16:18:03 +08:00
6 changed files with 73 additions and 16 deletions

View File

@@ -10,6 +10,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*; import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -80,4 +81,21 @@ public class KafkaConfig {
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
return factory; return factory;
} }
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
} }

View File

@@ -6,7 +6,7 @@ import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.time.LocalDateTime; import java.util.Date;
/** /**
* zt-face topic消息结构 * zt-face topic消息结构
@@ -44,7 +44,7 @@ public class FaceProcessingMessage {
* 拍摄时间 * 拍摄时间
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime shotTime; private Date shotTime;
// status字段已移除,由系统内部管理状态 // status字段已移除,由系统内部管理状态
@@ -54,7 +54,7 @@ public class FaceProcessingMessage {
* 消息创建时间 * 消息创建时间
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime; private Date createTime;
/** /**
* 消息来源 * 消息来源
@@ -69,14 +69,14 @@ public class FaceProcessingMessage {
* 创建人脸处理消息的工厂方法(使用外部传入的faceId) * 创建人脸处理消息的工厂方法(使用外部传入的faceId)
*/ */
public static FaceProcessingMessage create(Long externalFaceId, Long scenicId, Long deviceId, public static FaceProcessingMessage create(Long externalFaceId, Long scenicId, Long deviceId,
String faceUrl, LocalDateTime shotTime) { String faceUrl, Date shotTime) {
return FaceProcessingMessage.builder() return FaceProcessingMessage.builder()
.faceSampleId(externalFaceId) // 使用外部传入的ID作为唯一标识 .faceSampleId(externalFaceId) // 使用外部传入的ID作为唯一标识
.scenicId(scenicId) .scenicId(scenicId)
.deviceId(deviceId) .deviceId(deviceId)
.faceUrl(faceUrl) .faceUrl(faceUrl)
.shotTime(shotTime) .shotTime(shotTime)
.createTime(LocalDateTime.now()) .createTime(new Date())
.source("external-system") .source("external-system")
.build(); .build();
} }

View File

@@ -17,6 +17,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.ZoneId; import java.time.ZoneId;
@@ -43,8 +44,8 @@ public class FaceProcessingKafkaService {
* 消费外部系统发送的人脸处理消息 * 消费外部系统发送的人脸处理消息
* 先保存人脸样本数据,再进行异步人脸识别处理 * 先保存人脸样本数据,再进行异步人脸识别处理
*/ */
@KafkaListener(topics = ZT_FACE_TOPIC) @KafkaListener(topics = ZT_FACE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory")
public void processFaceMessage(String message) { public void processFaceMessage(String message, Acknowledgment ack) {
try { try {
FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class); FaceProcessingMessage faceMessage = JacksonUtil.parseObject(message, FaceProcessingMessage.class);
log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}", log.info("接收到外部人脸处理消息, scenicId: {}, deviceId: {}, faceUrl: {}",
@@ -62,12 +63,21 @@ public class FaceProcessingKafkaService {
// 然后进行人脸识别处理 // 然后进行人脸识别处理
if (saved) { if (saved) {
processFaceRecognition(faceMessage); boolean processed = processFaceRecognition(faceMessage);
if (processed) {
// 只有在所有处理都成功后才手动提交
ack.acknowledge();
log.info("消息处理成功并已提交, faceSampleId: {}", externalFaceId);
} else {
log.warn("人脸识别处理失败,消息不会被提交, faceSampleId: {}", externalFaceId);
}
} else {
log.warn("人脸样本保存失败,消息不会被提交, faceSampleId: {}", externalFaceId);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("处理外部人脸消息失败: {}", e.getMessage(), e); log.error("处理外部人脸消息失败,消息不会被提交: {}", e.getMessage(), e);
// TODO: 考虑错误重试机制或死信队列 // 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费
} }
} }
@@ -88,7 +98,7 @@ public class FaceProcessingKafkaService {
// 转换时间格式 // 转换时间格式
if (faceMessage.getShotTime() != null) { if (faceMessage.getShotTime() != null) {
Date shotTime = Date.from(faceMessage.getShotTime().atZone(ZoneId.systemDefault()).toInstant()); Date shotTime = faceMessage.getShotTime();
faceSample.setCreateAt(shotTime); faceSample.setCreateAt(shotTime);
} else { } else {
faceSample.setCreateAt(new Date()); faceSample.setCreateAt(new Date());
@@ -111,7 +121,7 @@ public class FaceProcessingKafkaService {
* 执行人脸识别处理逻辑 * 执行人脸识别处理逻辑
* 对已保存的人脸样本进行识别处理 * 对已保存的人脸样本进行识别处理
*/ */
private void processFaceRecognition(FaceProcessingMessage message) { private boolean processFaceRecognition(FaceProcessingMessage message) {
Long faceSampleId = message.getFaceSampleId(); Long faceSampleId = message.getFaceSampleId();
Long scenicId = message.getScenicId(); Long scenicId = message.getScenicId();
String faceUrl = message.getFaceUrl(); String faceUrl = message.getFaceUrl();
@@ -124,7 +134,7 @@ public class FaceProcessingKafkaService {
if (faceBodyAdapter == null) { if (faceBodyAdapter == null) {
log.error("人脸识别适配器不存在, scenicId: {}", scenicId); log.error("人脸识别适配器不存在, scenicId: {}", scenicId);
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return; return false;
} }
try { try {
@@ -157,9 +167,11 @@ public class FaceProcessingKafkaService {
DynamicTaskGenerator.addTask(faceSampleId); DynamicTaskGenerator.addTask(faceSampleId);
log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId); log.info("已添加到预订任务队列, faceSampleId: {}", faceSampleId);
} }
return true;
} else { } else {
log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId); log.warn("人脸添加返回空结果, faceSampleId: {}", faceSampleId);
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return false;
} }
} catch (Exception e) { } catch (Exception e) {
@@ -168,6 +180,7 @@ public class FaceProcessingKafkaService {
// 标记人脸样本为处理失败状态 // 标记人脸样本为处理失败状态
updateFaceSampleStatus(faceSampleId, -1); updateFaceSampleStatus(faceSampleId, -1);
return false;
} }
} }

View File

@@ -144,6 +144,17 @@ public enum ProductType {
} }
``` ```
#### 商品价格配置控制字段
`PriceProductConfig` 实体包含以下优惠控制字段:
- `canUseCoupon`: 是否可使用优惠券
- `canUseVoucher`: 是否可使用券码
- `canUseOnePrice`: 是否可使用一口价优惠(新增)
#### 一口价优惠控制机制
- 当购物车中任何商品的 `canUseOnePrice``false` 时,将跳过整个购物车的一口价优惠检测
- 配置优先级:具体商品配置 > 商品类型默认配置 > 系统默认(支持)
- 异常情况下默认支持一口价优惠,确保业务流程不受影响
#### 分层定价 #### 分层定价
支持基于数量的分层定价策略,通过 `PriceTierConfig` 配置不同数量区间的单价。 支持基于数量的分层定价策略,通过 `PriceTierConfig` 配置不同数量区间的单价。
@@ -339,6 +350,7 @@ public interface IDiscountDetectionService {
#### OnePricePurchaseDiscountProvider (优先级: 120) #### OnePricePurchaseDiscountProvider (优先级: 120)
- 处理一口价优惠逻辑(景区级统一价格) - 处理一口价优惠逻辑(景区级统一价格)
- **最高优先级**,优先于所有其他优惠类型 - **最高优先级**,优先于所有其他优惠类型
- 商品级别控制:检查购物车中所有商品的 `canUseOnePrice` 配置,任一商品不支持则跳过检测
- 仅当一口价小于当前金额时产生优惠;是否可与券码/优惠券叠加由配置 `canUseCoupon/canUseVoucher` 决定 - 仅当一口价小于当前金额时产生优惠;是否可与券码/优惠券叠加由配置 `canUseCoupon/canUseVoucher` 决定
#### BundleDiscountProvider (优先级: 100) #### BundleDiscountProvider (优先级: 100)
@@ -370,7 +382,7 @@ public interface IDiscountDetectionService {
特殊情况 特殊情况
- 全场免费券码直接最终价=0停止后续优惠 - 全场免费券码直接最终价=0停止后续优惠
- 一口价可叠加性由配置 canUseCoupon / canUseVoucher 控制 - 一口价可叠加性由配置 canUseCoupon / canUseVoucher 控制商品级别由 canUseOnePrice 控制参与检测
``` ```
#### 扩展支持 #### 扩展支持
@@ -535,7 +547,7 @@ public class PriceCalculationResult {
## 数据库设计 ## 数据库设计
### 核心表结构(摘) ### 核心表结构(摘)
- `price_product_config`: 商品价格基础配置 - `price_product_config`: 商品价格基础配置(包含 `can_use_coupon``can_use_voucher``can_use_one_price` 优惠控制字段)
- `price_tier_config`: 分层定价配置 - `price_tier_config`: 分层定价配置
- `price_bundle_config`: 套餐配置 - `price_bundle_config`: 套餐配置
- `price_coupon_config`: 优惠券配置 - `price_coupon_config`: 优惠券配置

View File

@@ -39,6 +39,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Strings; import org.apache.commons.lang3.Strings;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.math.BigDecimal; import java.math.BigDecimal;
@@ -49,6 +50,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@@ -377,8 +379,16 @@ public class PrinterServiceImpl implements PrinterService {
printerMapper.updateUserPhotoListToPrinter(memberId, scenicId, printerId); printerMapper.updateUserPhotoListToPrinter(memberId, scenicId, printerId);
} }
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String USER_PHOTO_LIST_TO_PRINTER = "USER_PHOTO_LIST_TO_PRINTER:";
@Override @Override
public void setUserIsBuyItem(Long memberId, Long id, Long orderId) { public void setUserIsBuyItem(Long memberId, Long id, Long orderId) {
if (redisTemplate.opsForValue().get(USER_PHOTO_LIST_TO_PRINTER + memberId + ":" + orderId) != null) {
return;
}
redisTemplate.opsForValue().set(USER_PHOTO_LIST_TO_PRINTER + memberId + ":" + orderId, "1", 60, TimeUnit.SECONDS);
printerMapper.setUserIsBuyItem(memberId, id, orderId); printerMapper.setUserIsBuyItem(memberId, id, orderId);
// 创建打印任务 // 创建打印任务
List<MemberPrintResp> userPhotoListByOrderId = getUserPhotoListByOrderId(orderId); List<MemberPrintResp> userPhotoListByOrderId = getUserPhotoListByOrderId(orderId);

View File

@@ -318,6 +318,10 @@ public class VideoPieceGetter {
} }
if (source == null) { if (source == null) {
SourceEntity imgSource = sourceMapper.findBySampleId(faceSampleId); SourceEntity imgSource = sourceMapper.findBySampleId(faceSampleId);
if (imgSource == null) {
log.warn("imgSource为null,跳过保存source记录, faceSampleId: {}", faceSampleId);
return false;
}
SourceEntity sourceEntity = new SourceEntity(); SourceEntity sourceEntity = new SourceEntity();
sourceEntity.setId(SnowFlakeUtil.getLongId()); sourceEntity.setId(SnowFlakeUtil.getLongId());
sourceEntity.setCreateTime(baseTime); sourceEntity.setCreateTime(baseTime);