You've already forked FrameTour-BE
Compare commits
10 Commits
2836326518
...
f33ce8e7a7
Author | SHA1 | Date | |
---|---|---|---|
f33ce8e7a7 | |||
de65fa1dd8 | |||
132a539bb6 | |||
9f66544a29 | |||
f4a16b5b09 | |||
9bc34fcfdb | |||
4b01e4cf82 | |||
f885f734ad | |||
ddbc2a0edb | |||
da89067c48 |
@@ -166,7 +166,7 @@ public class PriceBiz {
|
|||||||
allContentsPurchased = false;
|
allContentsPurchased = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
boolean hasPurchasedTemplate = orderBiz.checkUserBuyItem(userId, 0, videoEntities.getFirst().getVideoId());
|
boolean hasPurchasedTemplate = orderBiz.checkUserBuyItem(userId, -1, videoEntities.getFirst().getVideoId());
|
||||||
if (!hasPurchasedTemplate) {
|
if (!hasPurchasedTemplate) {
|
||||||
allContentsPurchased = false;
|
allContentsPurchased = false;
|
||||||
break;
|
break;
|
||||||
|
@@ -92,6 +92,7 @@ public class AppScenicController {
|
|||||||
resp.setEnableVoucher(scenicConfig.getBoolean("voucher_enable", false)); // compactible
|
resp.setEnableVoucher(scenicConfig.getBoolean("voucher_enable", false)); // compactible
|
||||||
resp.setGroupingEnable(scenicConfig.getBoolean("grouping_enable", false));
|
resp.setGroupingEnable(scenicConfig.getBoolean("grouping_enable", false));
|
||||||
resp.setShowPhotoWhenWaiting(scenicConfig.getBoolean("show_photo_when_waiting", false));
|
resp.setShowPhotoWhenWaiting(scenicConfig.getBoolean("show_photo_when_waiting", false));
|
||||||
|
resp.setWatermarkUrl(scenicConfig.getString("watermark_url"));
|
||||||
return ApiResponse.success(resp);
|
return ApiResponse.success(resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -101,9 +101,11 @@ public class ViidController {
|
|||||||
.setNamePrefix("VIID-" + scenicId + "-t")
|
.setNamePrefix("VIID-" + scenicId + "-t")
|
||||||
.build();
|
.build();
|
||||||
return new ThreadPoolExecutor(
|
return new ThreadPoolExecutor(
|
||||||
4, 1024, 0L, TimeUnit.MILLISECONDS,
|
8, 32, 10L, TimeUnit.SECONDS, // 核心2个线程,最大20个线程,空闲60秒回收
|
||||||
new ArrayBlockingQueue<>(1024),
|
new ArrayBlockingQueue<>(1024), // 队列大小从1024降至100
|
||||||
threadFactory);
|
threadFactory,
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时由调用线程执行,提供背压控制
|
||||||
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
65
src/main/java/com/ycwl/basic/dto/ZTSourceMessage.java
Normal file
65
src/main/java/com/ycwl/basic/dto/ZTSourceMessage.java
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package com.ycwl.basic.dto;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonFormat;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZT-Source Kafka消息实体
|
||||||
|
* 用于接收素材数据(照片和视频片段)
|
||||||
|
*
|
||||||
|
* @author system
|
||||||
|
* @date 2024/12/27
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class ZTSourceMessage {
|
||||||
|
@JsonProperty("sourceId")
|
||||||
|
private Long sourceId;
|
||||||
|
|
||||||
|
@JsonProperty("sourceType")
|
||||||
|
private Integer sourceType;
|
||||||
|
|
||||||
|
@JsonProperty("scenicId")
|
||||||
|
private Long scenicId;
|
||||||
|
|
||||||
|
@JsonProperty("deviceId")
|
||||||
|
private Long deviceId;
|
||||||
|
|
||||||
|
@JsonProperty("shootTime")
|
||||||
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
|
||||||
|
private Date shootTime;
|
||||||
|
|
||||||
|
@JsonProperty("thumbnailUrl")
|
||||||
|
private String thumbnailUrl;
|
||||||
|
|
||||||
|
@JsonProperty("sourceUrl")
|
||||||
|
private String sourceUrl;
|
||||||
|
|
||||||
|
@JsonProperty("resolution")
|
||||||
|
private String resolution;
|
||||||
|
|
||||||
|
@JsonProperty("faceSampleId")
|
||||||
|
private Long faceSampleId;
|
||||||
|
|
||||||
|
@JsonProperty("posJson")
|
||||||
|
private String posJson;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断是否为视频片段
|
||||||
|
*/
|
||||||
|
public boolean isVideo() {
|
||||||
|
return sourceType != null && sourceType == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断是否为照片
|
||||||
|
*/
|
||||||
|
public boolean isPhoto() {
|
||||||
|
return sourceType != null && sourceType == 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@@ -55,6 +55,8 @@ public class FaceProcessingKafkaService {
|
|||||||
Long externalFaceId = faceMessage.getFaceSampleId();
|
Long externalFaceId = faceMessage.getFaceSampleId();
|
||||||
if (externalFaceId == null) {
|
if (externalFaceId == null) {
|
||||||
log.error("外部消息中未包含faceSampleId");
|
log.error("外部消息中未包含faceSampleId");
|
||||||
|
// 即使消息格式错误,也消费消息避免重复处理
|
||||||
|
ack.acknowledge();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,21 +65,29 @@ public class FaceProcessingKafkaService {
|
|||||||
|
|
||||||
// 然后进行人脸识别处理
|
// 然后进行人脸识别处理
|
||||||
if (saved) {
|
if (saved) {
|
||||||
boolean processed = processFaceRecognition(faceMessage);
|
try {
|
||||||
if (processed) {
|
boolean processed = processFaceRecognition(faceMessage);
|
||||||
// 只有在所有处理都成功后才手动提交
|
if (processed) {
|
||||||
ack.acknowledge();
|
log.info("人脸识别处理成功, faceSampleId: {}", externalFaceId);
|
||||||
log.info("消息处理成功并已提交, faceSampleId: {}", externalFaceId);
|
} else {
|
||||||
} else {
|
log.warn("人脸识别处理失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
||||||
log.warn("人脸识别处理失败,消息不会被提交, faceSampleId: {}", externalFaceId);
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("人脸识别处理异常,但消息仍将被消费, faceSampleId: {}, error: {}", externalFaceId, e.getMessage(), e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.warn("人脸样本保存失败,消息不会被提交, faceSampleId: {}", externalFaceId);
|
log.warn("人脸样本保存失败,但消息仍将被消费, faceSampleId: {}", externalFaceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 无论处理是否成功,都消费消息
|
||||||
|
ack.acknowledge();
|
||||||
|
log.info("消息已消费, faceSampleId: {}", externalFaceId);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理外部人脸消息失败,消息不会被提交: {}", e.getMessage(), e);
|
log.error("处理外部人脸消息失败: {}", e.getMessage(), e);
|
||||||
// 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费
|
// 即使发生异常也消费消息,避免消息堆积
|
||||||
|
ack.acknowledge();
|
||||||
|
log.info("异常情况下消息已消费,避免重复处理");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -98,4 +98,11 @@ public interface SourceMapper {
|
|||||||
* @return type=2的source列表
|
* @return type=2的source列表
|
||||||
*/
|
*/
|
||||||
List<SourceEntity> listImageSourcesByFaceId(Long faceId);
|
List<SourceEntity> listImageSourcesByFaceId(Long faceId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从ZT-Source消息添加素材
|
||||||
|
* @param source 素材实体
|
||||||
|
* @return 影响行数
|
||||||
|
*/
|
||||||
|
int addFromZTSource(SourceEntity source);
|
||||||
}
|
}
|
||||||
|
@@ -49,4 +49,5 @@ public class ScenicConfigResp {
|
|||||||
private Boolean enableVoucher;
|
private Boolean enableVoucher;
|
||||||
private Boolean groupingEnable;
|
private Boolean groupingEnable;
|
||||||
private Boolean showPhotoWhenWaiting;
|
private Boolean showPhotoWhenWaiting;
|
||||||
|
private String watermarkUrl;
|
||||||
}
|
}
|
||||||
|
@@ -0,0 +1,68 @@
|
|||||||
|
package com.ycwl.basic.service;
|
||||||
|
|
||||||
|
import com.ycwl.basic.dto.ZTSourceMessage;
|
||||||
|
import com.ycwl.basic.utils.JacksonUtil;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.kafka.support.Acknowledgment;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZT-Source Kafka消费者服务
|
||||||
|
* 监听zt-source topic并处理素材消息
|
||||||
|
*
|
||||||
|
* @author system
|
||||||
|
* @date 2024/12/27
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true")
|
||||||
|
public class ZTSourceConsumerService {
|
||||||
|
|
||||||
|
private static final String ZT_SOURCE_TOPIC = "zt-source";
|
||||||
|
|
||||||
|
private final ZTSourceDataService ztSourceDataService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 监听zt-source topic消息
|
||||||
|
* 先解析消息并输出业务日志,然后手动确认处理
|
||||||
|
*
|
||||||
|
* @param message 消息JSON字符串
|
||||||
|
* @param ack 手动ACK确认
|
||||||
|
*/
|
||||||
|
@KafkaListener(topics = ZT_SOURCE_TOPIC, containerFactory = "manualCommitKafkaListenerContainerFactory")
|
||||||
|
public void handleZTSourceMessage(String message, Acknowledgment ack) {
|
||||||
|
ZTSourceMessage sourceMessage = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 先解析消息
|
||||||
|
sourceMessage = JacksonUtil.parseObject(message, ZTSourceMessage.class);
|
||||||
|
|
||||||
|
// 输出业务相关的日志信息
|
||||||
|
log.info("接收到ZT-Source消息, sourceId: {}, deviceId: {}, faceSampleId: {}",
|
||||||
|
sourceMessage.getSourceId(), sourceMessage.getDeviceId(), sourceMessage.getFaceSampleId());
|
||||||
|
|
||||||
|
// 处理消息
|
||||||
|
boolean processed = ztSourceDataService.processZTSourceMessage(sourceMessage);
|
||||||
|
|
||||||
|
if (processed) {
|
||||||
|
// 只有在处理成功后才手动提交
|
||||||
|
ack.acknowledge();
|
||||||
|
log.info("ZT-Source消息处理成功并已提交, sourceId: {}", sourceMessage.getSourceId());
|
||||||
|
} else {
|
||||||
|
log.warn("ZT-Source消息处理被跳过(非照片类型),消息不会被提交, sourceId: {}, sourceType: {}",
|
||||||
|
sourceMessage.getSourceId(), sourceMessage.getSourceType());
|
||||||
|
// 对于非照片类型,也提交消息避免重复消费
|
||||||
|
ack.acknowledge();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
String sourceId = sourceMessage != null ? sourceMessage.getSourceId().toString() : "unknown";
|
||||||
|
log.error("处理ZT-Source消息失败,消息不会被提交: sourceId={}, error={}", sourceId, e.getMessage(), e);
|
||||||
|
// 不调用ack.acknowledge(),消息保持未提交状态,可以重新消费
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
138
src/main/java/com/ycwl/basic/service/ZTSourceDataService.java
Normal file
138
src/main/java/com/ycwl/basic/service/ZTSourceDataService.java
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
package com.ycwl.basic.service;
|
||||||
|
|
||||||
|
import com.ycwl.basic.dto.ZTSourceMessage;
|
||||||
|
import com.ycwl.basic.integration.common.manager.DeviceConfigManager;
|
||||||
|
import com.ycwl.basic.mapper.SourceMapper;
|
||||||
|
import com.ycwl.basic.model.pc.source.entity.SourceEntity;
|
||||||
|
import com.ycwl.basic.repository.DeviceRepository;
|
||||||
|
import com.ycwl.basic.utils.SnowFlakeUtil;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.logging.log4j.util.Strings;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZT-Source数据处理服务
|
||||||
|
* 负责将ZT-Source消息转换并保存到数据库
|
||||||
|
*
|
||||||
|
* @author system
|
||||||
|
* @date 2024/12/27
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ZTSourceDataService {
|
||||||
|
|
||||||
|
private final SourceMapper sourceMapper;
|
||||||
|
private final DeviceRepository deviceRepository;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理ZT-Source消息,仅处理照片类型(sourceType=2)
|
||||||
|
*
|
||||||
|
* @param message ZT-Source消息
|
||||||
|
* @return 是否处理成功
|
||||||
|
*/
|
||||||
|
public boolean processZTSourceMessage(ZTSourceMessage message) {
|
||||||
|
try {
|
||||||
|
// 仅处理照片类型的消息
|
||||||
|
if (!message.isPhoto()) {
|
||||||
|
log.debug("跳过非照片类型消息: sourceId={}, sourceType={}",
|
||||||
|
message.getSourceId(), message.getSourceType());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查必要字段
|
||||||
|
if (!validateMessage(message)) {
|
||||||
|
log.warn("消息校验失败: {}", message);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 转换为SourceEntity
|
||||||
|
SourceEntity sourceEntity = convertToSourceEntity(message);
|
||||||
|
DeviceConfigManager configManager = deviceRepository.getDeviceConfigManager(sourceEntity.getDeviceId());
|
||||||
|
if (configManager != null) {
|
||||||
|
if (Strings.isNotBlank(configManager.getString("crop_config"))) {
|
||||||
|
sourceEntity.setUrl(message.getThumbnailUrl());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 保存到数据库
|
||||||
|
int result = sourceMapper.addFromZTSource(sourceEntity);
|
||||||
|
|
||||||
|
if (result > 0) {
|
||||||
|
log.info("成功保存ZT-Source照片素材: sourceId={}, entityId={}, scenicId={}, deviceId={}",
|
||||||
|
message.getSourceId(), sourceEntity.getId(), message.getScenicId(), message.getDeviceId());
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
log.error("保存ZT-Source照片素材失败: sourceId={}", message.getSourceId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理ZT-Source消息异常: sourceId={}, error={}",
|
||||||
|
message.getSourceId(), e.getMessage(), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 校验消息必要字段
|
||||||
|
*/
|
||||||
|
private boolean validateMessage(ZTSourceMessage message) {
|
||||||
|
if (message.getScenicId() == null) {
|
||||||
|
log.warn("scenicId不能为空: sourceId={}", message.getSourceId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.getDeviceId() == null) {
|
||||||
|
log.warn("deviceId不能为空: sourceId={}", message.getSourceId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.getSourceUrl() == null || message.getSourceUrl().trim().isEmpty()) {
|
||||||
|
log.warn("sourceUrl不能为空: sourceId={}", message.getSourceId());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 将ZTSourceMessage转换为SourceEntity
|
||||||
|
*/
|
||||||
|
private SourceEntity convertToSourceEntity(ZTSourceMessage message) {
|
||||||
|
SourceEntity entity = new SourceEntity();
|
||||||
|
|
||||||
|
// 生成ID
|
||||||
|
entity.setId(SnowFlakeUtil.getLongId());
|
||||||
|
|
||||||
|
// 基本字段映射
|
||||||
|
entity.setScenicId(message.getScenicId());
|
||||||
|
entity.setDeviceId(message.getDeviceId());
|
||||||
|
entity.setUrl(message.getSourceUrl()); // 使用sourceUrl,不使用缩略图
|
||||||
|
entity.setType(2); // 照片类型
|
||||||
|
|
||||||
|
// 人脸样本ID处理
|
||||||
|
entity.setFaceSampleId(message.getFaceSampleId());
|
||||||
|
|
||||||
|
// 位置信息JSON处理
|
||||||
|
entity.setPosJson(message.getPosJson());
|
||||||
|
|
||||||
|
// 时间处理
|
||||||
|
Date shootTime = message.getShootTime();
|
||||||
|
if (shootTime != null) {
|
||||||
|
entity.setCreateTime(shootTime);
|
||||||
|
} else {
|
||||||
|
entity.setCreateTime(new Date());
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("转换ZTSourceMessage到SourceEntity: sourceId={} -> entityId={}",
|
||||||
|
message.getSourceId(), entity.getId());
|
||||||
|
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
}
|
@@ -759,6 +759,9 @@ public class FaceServiceImpl implements FaceService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
boolean buy = orderBiz.checkUserBuyItem(userId, contentPageVO.getGoodsType(), contentPageVO.getContentId());
|
boolean buy = orderBiz.checkUserBuyItem(userId, contentPageVO.getGoodsType(), contentPageVO.getContentId());
|
||||||
|
if (!buy) {
|
||||||
|
buy = orderBiz.checkUserBuyItem(userId, -1, contentPageVO.getTemplateId());
|
||||||
|
}
|
||||||
if (buy) {
|
if (buy) {
|
||||||
contentPageVO.setIsBuy(1);
|
contentPageVO.setIsBuy(1);
|
||||||
} else {
|
} else {
|
||||||
|
@@ -49,6 +49,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@@ -251,11 +252,18 @@ public class TaskFaceServiceImpl implements TaskFaceService {
|
|||||||
.anyMatch(record -> record.getScore() > _lowThreshold);
|
.anyMatch(record -> record.getScore() > _lowThreshold);
|
||||||
respVo.setLowThreshold(isLowThreshold);
|
respVo.setLowThreshold(isLowThreshold);
|
||||||
allFaceSampleIds = records.stream()
|
allFaceSampleIds = records.stream()
|
||||||
|
.sorted(Comparator.comparing(SearchFaceResultItem::getScore).reversed())
|
||||||
.map(SearchFaceResultItem::getExtData)
|
.map(SearchFaceResultItem::getExtData)
|
||||||
.filter(StringUtils::isNumeric)
|
.filter(StringUtils::isNumeric)
|
||||||
.map(Long::valueOf)
|
.map(Long::valueOf)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
List<FaceSampleEntity> allFaceSampleList = faceSampleMapper.listByIds(allFaceSampleIds);
|
List<FaceSampleEntity> allFaceSampleList = faceSampleMapper.listByIds(allFaceSampleIds);
|
||||||
|
// 按照allFaceSampleIds的顺序对allFaceSampleList进行排序
|
||||||
|
Map<Long, Integer> idIndexMap = new HashMap<>();
|
||||||
|
for (int i = 0; i < allFaceSampleIds.size(); i++) {
|
||||||
|
idIndexMap.put(allFaceSampleIds.get(i), i);
|
||||||
|
}
|
||||||
|
allFaceSampleList.sort(Comparator.comparing(sample -> idIndexMap.get(sample.getId())));
|
||||||
acceptFaceSampleIds = applySampleFilters(acceptFaceSampleIds, allFaceSampleList, scenicConfig);
|
acceptFaceSampleIds = applySampleFilters(acceptFaceSampleIds, allFaceSampleList, scenicConfig);
|
||||||
List<MatchLocalRecord> collect = new ArrayList<>();
|
List<MatchLocalRecord> collect = new ArrayList<>();
|
||||||
for (SearchFaceResultItem item : records) {
|
for (SearchFaceResultItem item : records) {
|
||||||
@@ -515,9 +523,8 @@ public class TaskFaceServiceImpl implements TaskFaceService {
|
|||||||
log.debug("设备照片限制:设备ID={}, 无限制,保留{}张照片",
|
log.debug("设备照片限制:设备ID={}, 无限制,保留{}张照片",
|
||||||
deviceId, deviceSampleIds.size());
|
deviceId, deviceSampleIds.size());
|
||||||
} else {
|
} else {
|
||||||
// 按创建时间倒序排序,取前N张
|
// 取前N张
|
||||||
List<FaceSampleEntity> limitedSamples = deviceSamples.stream()
|
List<FaceSampleEntity> limitedSamples = deviceSamples.stream()
|
||||||
.sorted(Comparator.comparing(FaceSampleEntity::getCreateAt).reversed())
|
|
||||||
.limit(limitPhoto)
|
.limit(limitPhoto)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
@@ -53,6 +53,7 @@ import java.util.concurrent.ThreadFactory;
|
|||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
@@ -145,7 +146,7 @@ public class VideoPieceGetter {
|
|||||||
new ArrayBlockingQueue<>(128),
|
new ArrayBlockingQueue<>(128),
|
||||||
threadFactory
|
threadFactory
|
||||||
);
|
);
|
||||||
List<String> currentUnFinPlaceholder = new ArrayList<>();
|
Map<String, AtomicInteger> currentUnFinPlaceholder = new ConcurrentHashMap<>();
|
||||||
List<FaceSampleEntity> list = faceSampleMapper.listByIds(task.getFaceSampleIds());
|
List<FaceSampleEntity> list = faceSampleMapper.listByIds(task.getFaceSampleIds());
|
||||||
Map<Long, Long> pairDeviceMap = new ConcurrentHashMap<>();
|
Map<Long, Long> pairDeviceMap = new ConcurrentHashMap<>();
|
||||||
if (!list.isEmpty()) {
|
if (!list.isEmpty()) {
|
||||||
@@ -169,12 +170,12 @@ public class VideoPieceGetter {
|
|||||||
})
|
})
|
||||||
.collect(Collectors.groupingBy(FaceSampleEntity::getDeviceId));
|
.collect(Collectors.groupingBy(FaceSampleEntity::getDeviceId));
|
||||||
if (templatePlaceholder != null) {
|
if (templatePlaceholder != null) {
|
||||||
IntStream.range(0, templatePlaceholder.size()).forEach(i -> {
|
templatePlaceholder.forEach(deviceId -> {
|
||||||
currentUnFinPlaceholder.add(templatePlaceholder.get(i));
|
currentUnFinPlaceholder.computeIfAbsent(deviceId, k -> new AtomicInteger(0)).incrementAndGet();
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
collection.keySet().forEach(i -> {
|
collection.keySet().forEach(deviceId -> {
|
||||||
currentUnFinPlaceholder.add(i.toString());
|
currentUnFinPlaceholder.put(deviceId.toString(), new AtomicInteger(1));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
collection.values().forEach(faceSampleList -> {
|
collection.values().forEach(faceSampleList -> {
|
||||||
@@ -188,26 +189,45 @@ public class VideoPieceGetter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
isFirst.set(false);
|
isFirst.set(false);
|
||||||
|
// 处理关联设备:如果当前设备是某个主设备的配对设备,也处理主设备
|
||||||
if (pairDeviceMap.containsValue(faceSample.getDeviceId())) {
|
if (pairDeviceMap.containsValue(faceSample.getDeviceId())) {
|
||||||
// 有关联设备!
|
|
||||||
// 找到对应的deviceId
|
|
||||||
pairDeviceMap.entrySet().stream()
|
pairDeviceMap.entrySet().stream()
|
||||||
.filter(entry -> entry.getValue().equals(faceSample.getDeviceId()))
|
.filter(entry -> entry.getValue().equals(faceSample.getDeviceId()))
|
||||||
.map(Map.Entry::getKey).forEach(pairDeviceId -> {
|
.map(Map.Entry::getKey).forEach(pairDeviceId -> {
|
||||||
log.info("找到同景区关联设备:{} -> {}", pairDeviceId, faceSample.getDeviceId());
|
log.info("找到同景区关联设备:{} -> {}", pairDeviceId, faceSample.getDeviceId());
|
||||||
if (pairDeviceId != null) {
|
if (pairDeviceId != null) {
|
||||||
doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task);
|
doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task);
|
||||||
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
|
// 让主设备的计数器 -1
|
||||||
|
AtomicInteger pairCount = currentUnFinPlaceholder.get(pairDeviceId.toString());
|
||||||
|
if (pairCount != null && pairCount.decrementAndGet() <= 0) {
|
||||||
|
currentUnFinPlaceholder.remove(pairDeviceId.toString());
|
||||||
|
log.info("设备 {} 的placeholder已满足", pairDeviceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 处理当前设备
|
||||||
doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task);
|
doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task);
|
||||||
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
|
AtomicInteger count = currentUnFinPlaceholder.get(faceSample.getDeviceId().toString());
|
||||||
|
if (count != null && count.decrementAndGet() <= 0) {
|
||||||
|
currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString());
|
||||||
|
log.info("设备 {} 的placeholder已满足", faceSample.getDeviceId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果有templateId,检查是否所有placeholder都已满足
|
||||||
if (templatePlaceholder != null) {
|
if (templatePlaceholder != null) {
|
||||||
log.info("当前进度:!{}/{}", currentUnFinPlaceholder.size(), templatePlaceholder.size());
|
int totalPlaceholderCount = templatePlaceholder.size();
|
||||||
|
int remainingCount = currentUnFinPlaceholder.values().stream()
|
||||||
|
.mapToInt(AtomicInteger::get)
|
||||||
|
.sum();
|
||||||
|
log.info("当前进度:已完成 {}/{},剩余 {} 个placeholder未满足",
|
||||||
|
totalPlaceholderCount - remainingCount, totalPlaceholderCount, remainingCount);
|
||||||
|
|
||||||
if (currentUnFinPlaceholder.isEmpty()) {
|
if (currentUnFinPlaceholder.isEmpty()) {
|
||||||
if (!invoke.get()) {
|
if (!invoke.get()) {
|
||||||
invoke.set(true);
|
invoke.set(true);
|
||||||
|
log.info("所有placeholder已满足,提前调用callback");
|
||||||
task.getCallback().onInvoke();
|
task.getCallback().onInvoke();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -28,33 +28,53 @@ public class ImageUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static MultipartFile cropImage(MultipartFile file, int x, int y, int w, int h) throws IOException {
|
public static MultipartFile cropImage(MultipartFile file, int x, int y, int w, int h) throws IOException {
|
||||||
BufferedImage image = ImageIO.read(file.getInputStream());
|
BufferedImage image = null;
|
||||||
log.info("图片宽高:{}", image.getWidth() + "x" + image.getHeight());
|
BufferedImage targetImage = null;
|
||||||
log.info("图片裁切:{}@{}", w + "x" + h, x + "," + y);
|
|
||||||
if (image.getWidth() < w) {
|
|
||||||
w = image.getWidth();
|
|
||||||
}
|
|
||||||
if (image.getHeight() < h) {
|
|
||||||
h = image.getHeight();
|
|
||||||
}
|
|
||||||
int targetX = x;
|
|
||||||
if (x < 0) {
|
|
||||||
targetX = 0;
|
|
||||||
} else if ((x + w) > image.getWidth()) {
|
|
||||||
targetX = image.getWidth() - w;
|
|
||||||
}
|
|
||||||
int targetY = y;
|
|
||||||
if (y < 0) {
|
|
||||||
targetY = 0;
|
|
||||||
} else if ((y + h) > image.getHeight()) {
|
|
||||||
targetY = image.getHeight() - h;
|
|
||||||
}
|
|
||||||
log.info("图片实际裁切:{}@{}", w + "x" + h, targetX + "," + targetY);
|
|
||||||
BufferedImage targetImage = image.getSubimage(targetX, targetY, w, h);
|
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
ImageIO.write(targetImage, "jpg", baos);
|
try {
|
||||||
baos.close();
|
image = ImageIO.read(file.getInputStream());
|
||||||
return new Base64DecodedMultipartFile(baos.toByteArray(), "image/jpeg");
|
log.info("图片宽高:{}", image.getWidth() + "x" + image.getHeight());
|
||||||
|
log.info("图片裁切:{}@{}", w + "x" + h, x + "," + y);
|
||||||
|
if (image.getWidth() < w) {
|
||||||
|
w = image.getWidth();
|
||||||
|
}
|
||||||
|
if (image.getHeight() < h) {
|
||||||
|
h = image.getHeight();
|
||||||
|
}
|
||||||
|
int targetX = x;
|
||||||
|
if (x < 0) {
|
||||||
|
targetX = 0;
|
||||||
|
} else if ((x + w) > image.getWidth()) {
|
||||||
|
targetX = image.getWidth() - w;
|
||||||
|
}
|
||||||
|
int targetY = y;
|
||||||
|
if (y < 0) {
|
||||||
|
targetY = 0;
|
||||||
|
} else if ((y + h) > image.getHeight()) {
|
||||||
|
targetY = image.getHeight() - h;
|
||||||
|
}
|
||||||
|
log.info("图片实际裁切:{}@{}", w + "x" + h, targetX + "," + targetY);
|
||||||
|
targetImage = image.getSubimage(targetX, targetY, w, h);
|
||||||
|
ImageIO.write(targetImage, "jpg", baos);
|
||||||
|
return new Base64DecodedMultipartFile(baos.toByteArray(), "image/jpeg");
|
||||||
|
} finally {
|
||||||
|
// 修复内存泄漏:显式释放图片资源
|
||||||
|
if (image != null) {
|
||||||
|
image.flush();
|
||||||
|
image = null;
|
||||||
|
}
|
||||||
|
if (targetImage != null) {
|
||||||
|
targetImage.flush();
|
||||||
|
targetImage = null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
baos.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.warn("关闭ByteArrayOutputStream失败", e);
|
||||||
|
}
|
||||||
|
// 建议JVM进行垃圾回收
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Base64DecodedMultipartFile implements MultipartFile {
|
public static class Base64DecodedMultipartFile implements MultipartFile {
|
||||||
|
@@ -5,6 +5,10 @@
|
|||||||
insert into source(id, scenic_id, device_id, url, video_url, `type`, face_sample_id, pos_json, create_time)
|
insert into source(id, scenic_id, device_id, url, video_url, `type`, face_sample_id, pos_json, create_time)
|
||||||
values (#{id}, #{scenicId}, #{deviceId}, #{url}, #{videoUrl}, #{type}, #{faceSampleId}, #{posJson}, #{createTime})
|
values (#{id}, #{scenicId}, #{deviceId}, #{url}, #{videoUrl}, #{type}, #{faceSampleId}, #{posJson}, #{createTime})
|
||||||
</insert>
|
</insert>
|
||||||
|
<insert id="addFromZTSource">
|
||||||
|
insert into source(id, scenic_id, device_id, url, `type`, face_sample_id, pos_json, create_time)
|
||||||
|
values (#{id}, #{scenicId}, #{deviceId}, #{url}, #{type}, #{faceSampleId}, #{posJson}, #{createTime})
|
||||||
|
</insert>
|
||||||
<insert id="addRelation">
|
<insert id="addRelation">
|
||||||
replace member_source(scenic_id, face_id, member_id, source_id, is_buy, type, order_id<if test="isFree">, is_free</if>)
|
replace member_source(scenic_id, face_id, member_id, source_id, is_buy, type, order_id<if test="isFree">, is_free</if>)
|
||||||
values (#{scenicId}, #{faceId}, #{memberId}, #{sourceId}, #{isBuy}, #{type}, #{orderId}<if test="isFree">, #{isFree}</if>)
|
values (#{scenicId}, #{faceId}, #{memberId}, #{sourceId}, #{isBuy}, #{type}, #{orderId}<if test="isFree">, #{isFree}</if>)
|
||||||
|
Reference in New Issue
Block a user