Files
FrameTour-BE/src/main/java/com/ycwl/basic/service/custom/CustomUploadTaskService.java
Jerry Yan f2ac6aaea0 refactor(scenic): 重构景区相关接口和缓存机制
- 移除 ScenicMapper 接口,将相关方法移至 ScenicRepository
- 修改景区列表查询逻辑,使用 ScenicRepository 的 list 方法
- 优化景区详情获取方式,使用 ScenicRepository 的 getScenicBasic 方法
- 重构缓存机制,增加对景区基本信息的缓存
- 优化 AppScenicService 和 ScenicService接口,使用 ScenicV2DTO 替代 ScenicRespV
2025-08-27 16:37:57 +08:00

338 lines
14 KiB
Java

package com.ycwl.basic.service.custom;
import com.aliyun.credentials.Client;
import com.aliyun.credentials.models.Config;
import com.aliyun.mts20140618.models.QuerySmarttagJobRequest;
import com.aliyun.mts20140618.models.QuerySmarttagJobResponse;
import com.aliyun.mts20140618.models.QuerySmarttagJobResponseBody;
import com.aliyun.mts20140618.models.SubmitSmarttagJobRequest;
import com.aliyun.mts20140618.models.SubmitSmarttagJobResponse;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.ycwl.basic.facebody.adapter.IFaceBodyAdapter;
import com.ycwl.basic.facebody.entity.AddFaceResp;
import com.ycwl.basic.mapper.DeviceMapper;
import com.ycwl.basic.mapper.CustomUploadTaskMapper;
import com.ycwl.basic.mapper.FaceSampleMapper;
import com.ycwl.basic.mapper.SourceMapper;
import com.ycwl.basic.model.pc.device.entity.DeviceEntity;
import com.ycwl.basic.model.custom.entity.CustomUploadTaskEntity;
import com.ycwl.basic.model.custom.entity.FaceData;
import com.ycwl.basic.model.custom.req.CreateUploadTaskReq;
import com.ycwl.basic.model.custom.resp.CreateUploadTaskResp;
import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity;
import com.ycwl.basic.model.pc.source.entity.SourceEntity;
import com.ycwl.basic.service.pc.ScenicService;
import com.ycwl.basic.storage.StorageFactory;
import com.ycwl.basic.storage.adapters.IStorageAdapter;
import com.ycwl.basic.utils.JacksonUtil;
import com.ycwl.basic.utils.SnowFlakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Strings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.ycwl.basic.constant.StorageConstant.VIID_FACE;
@Slf4j
@Service
public class CustomUploadTaskService {
@Autowired
private CustomUploadTaskMapper customUploadTaskMapper;
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private ScenicService scenicService;
@Autowired
private FaceSampleMapper faceSampleMapper;
@Autowired
private SourceMapper sourceMapper;
private static final ThreadPoolExecutor executor;
static {
executor = new ThreadPoolExecutor(16, 32, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));
}
public CreateUploadTaskResp createUploadTask(CreateUploadTaskReq req) {
if (StringUtils.isBlank(req.getAccessKey())) {
throw new RuntimeException("设备访问密钥不能为空");
}
if (StringUtils.isBlank(req.getFileName())) {
throw new RuntimeException("文件名不能为空");
}
if (StringUtils.isBlank(req.getType())) {
throw new RuntimeException("上传类型不能为空");
}
// 验证设备访问权限
DeviceEntity device = validateDeviceAccess(req.getAccessKey(), req.getType());
long taskId = SnowFlakeUtil.getLongId();
String savePath = generateSavePath(device.getScenicId(), req.getFileName());
CustomUploadTaskEntity task = new CustomUploadTaskEntity();
task.setId(taskId);
task.setScenicId(device.getScenicId());
task.setDeviceId(device.getId());
task.setSavePath(savePath);
task.setStatus("PENDING");
task.setCreateTime(new Date());
task.setUpdateTime(new Date());
customUploadTaskMapper.insert(task);
String uploadUrl = generateUploadUrl(savePath);
CreateUploadTaskResp resp = new CreateUploadTaskResp();
resp.setTaskId(taskId);
resp.setUploadUrl(uploadUrl);
resp.setSavePath(savePath);
return resp;
}
public void completeUpload(String accessKey, Long taskId) {
// 验证设备访问权限
DeviceEntity device = validateDeviceAccess(accessKey, null);
CustomUploadTaskEntity task = customUploadTaskMapper.selectById(taskId);
if (task == null) {
throw new RuntimeException("任务不存在");
}
// 验证任务属于该设备的景区
if (!device.getScenicId().equals(task.getScenicId())) {
throw new RuntimeException("无权限操作该任务");
}
task.setStatus("UPLOADING");
task.setUpdateTime(new Date());
customUploadTaskMapper.updateById(task);
try {
String jobId = submitSmarttagJob(task.getSavePath());
task.setJobId(jobId);
task.setStatus("COMPLETED");
task.setUpdateTime(new Date());
customUploadTaskMapper.updateById(task);
log.info("人脸识别任务提交成功,taskId: {}, jobId: {}", taskId, jobId);
} catch (Exception e) {
task.setStatus("FAILED");
task.setErrorMsg("人脸识别任务提交失败: " + e.getMessage());
task.setUpdateTime(new Date());
customUploadTaskMapper.updateById(task);
log.error("人脸识别任务提交失败,taskId: {}", taskId, e);
throw new RuntimeException("人脸识别任务提交失败");
}
}
public void markTaskFailed(String accessKey, Long taskId, String errorMsg) {
// 验证设备访问权限
DeviceEntity device = validateDeviceAccess(accessKey, null);
CustomUploadTaskEntity task = customUploadTaskMapper.selectById(taskId);
if (task == null) {
throw new RuntimeException("任务不存在");
}
// 验证任务属于该设备的景区
if (!device.getScenicId().equals(task.getScenicId())) {
throw new RuntimeException("无权限操作该任务");
}
task.setStatus("FAILED");
task.setErrorMsg(errorMsg);
task.setUpdateTime(new Date());
customUploadTaskMapper.updateById(task);
log.info("任务标记为失败,taskId: {}, errorMsg: {}", taskId, errorMsg);
}
public void handleAliyunCallback(String jobId, String status) {
LambdaQueryWrapper<CustomUploadTaskEntity> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(CustomUploadTaskEntity::getJobId, jobId);
CustomUploadTaskEntity task = customUploadTaskMapper.selectOne(wrapper);
if (task == null) {
log.warn("未找到对应的上传任务,jobId: {}", jobId);
return;
}
log.info("收到阿里云回调,jobId: {}, status: {}, taskId: {}", jobId, status, task.getId());
handleAliyunMpsJobComplete(jobId);
}
private String generateSavePath(Long scenicId, String fileName) {
String timestamp = String.valueOf(System.currentTimeMillis());
String extension = "";
if (fileName.contains(".")) {
extension = fileName.substring(fileName.lastIndexOf("."));
}
return String.format("custom-device/%d/%s%s", scenicId, timestamp, extension);
}
private String generateUploadUrl(String savePath) {
try {
IStorageAdapter adapter = StorageFactory.use();
Date expireDate = new Date(System.currentTimeMillis() + 3600 * 1000);
return adapter.getUrlForUpload(expireDate, null, savePath);
} catch (Exception e) {
log.error("生成上传URL失败,savePath: {}", savePath, e);
throw new RuntimeException("生成上传URL失败");
}
}
private String submitSmarttagJob(String inputPath) {
try {
log.info("提交人脸识别任务,inputPath: {}", inputPath);
String jobId = createAliyunMtsTask(inputPath);
executor.execute(() -> loopQueryJobStatus(jobId));
return jobId;
} catch (Exception e) {
log.error("提交人脸识别任务失败,inputPath: {}", inputPath, e);
throw new RuntimeException("提交人脸识别任务失败: " + e.getMessage());
}
}
private DeviceEntity validateDeviceAccess(String accessKey, String type) {
if (StringUtils.isBlank(accessKey)) {
throw new RuntimeException("设备访问密钥不能为空");
}
DeviceEntity device = deviceMapper.getByDeviceNo(accessKey);
if (device == null || device.getStatus() != 1) {
throw new RuntimeException("无效的设备访问密钥或设备已被禁用");
}
return device;
}
private String createAliyunMtsTask(String inputPath) {
try {
// 创建任务
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
.setAccessKeyId("LTAI5tCWgYJz9kZvvh2KVhkH")
.setAccessKeySecret("RObb3EZ1YsmR63ul1gh7tnIfT1foOc");
config.endpoint = "mts.cn-shanghai.aliyuncs.com";
com.aliyun.mts20140618.Client client = new com.aliyun.mts20140618.Client(config);
SubmitSmarttagJobRequest request = new SubmitSmarttagJobRequest();
request.setPipelineId("d791f854652e466bad66301e5c97b7bb");
request.setTemplateId("a004e08402bd496a8a9adbb2ba920973");
request.setTitle(String.valueOf(System.currentTimeMillis()));
request.setNotifyUrl("https://zhentuai.com/extern/custom-device/aliyun/mps/callback");
request.setInput(String.format("oss://frametour-assets/user-assets/%s", inputPath));
com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
SubmitSmarttagJobResponse submitSmarttagJobResponse = client.submitSmarttagJobWithOptions(request, runtime);
return submitSmarttagJobResponse.getBody().jobId;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void loopQueryJobStatus(String jobId) {
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
return;
}
handleAliyunMpsJobComplete(jobId);
}
public void handleAliyunMpsJobComplete(String jobId) {
CustomUploadTaskEntity task = customUploadTaskMapper.getByJobId(jobId);
try {
// 查询任务
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
.setAccessKeyId("LTAI5tCWgYJz9kZvvh2KVhkH")
.setAccessKeySecret("RObb3EZ1YsmR63ul1gh7tnIfT1foOc");
config.endpoint = "mts.cn-shanghai.aliyuncs.com";
com.aliyun.mts20140618.Client client = new com.aliyun.mts20140618.Client(config);
QuerySmarttagJobRequest request = new QuerySmarttagJobRequest();
request.setJobId(jobId);
QuerySmarttagJobResponse response = client.querySmarttagJob(request);
if (Strings.CI.equals(response.getBody().jobStatus, "Fail")) {
log.error("智能标签任务失败");
return;
}
if (!Strings.CI.equals(response.getBody().jobStatus, "Success")) {
log.info("jobId:{} 智能标签任务等待查询!", jobId);
executor.execute(() -> loopQueryJobStatus(jobId));
return;
}
List<QuerySmarttagJobResponseBody.QuerySmarttagJobResponseBodyResultsResult> result = response.getBody().results.getResult();
QuerySmarttagJobResponseBody.QuerySmarttagJobResponseBodyResultsResult first = result.stream()
.filter(r -> "VideoLabel".equals(r.getType()))
.findFirst()
.orElse(null);
if (first == null) {
return;
}
List<FaceData> persons = JacksonUtil.getArray(first.getData(), "persons", FaceData.class);
if (persons == null || persons.isEmpty()) {
return;
}
IFaceBodyAdapter faceBodyAdapter = scenicService.getScenicFaceBodyAdapter(task.getScenicId());
IStorageAdapter storageAdapter = StorageFactory.use();
persons.stream()
.filter(p -> p.getRatio() > 0.1)
.map(FaceData::getOccurrences)
.filter(Objects::nonNull)
.forEach(occurrences -> {
if (occurrences.isEmpty()) {
return;
}
Long newFaceSampleId = SnowFlakeUtil.getLongId();
String faceUrl = occurrences.getFirst().getFaceUrl();
FaceSampleEntity faceSample = new FaceSampleEntity();
faceSample.setId(newFaceSampleId);
faceSample.setScenicId(task.getScenicId());
faceSample.setDeviceId(task.getDeviceId());
faceSample.setStatus(1);
faceSample.setCreateAt(task.getCreateTime());
faceSample.setFaceUrl(faceUrl);
faceSampleMapper.add(faceSample);
faceBodyAdapter.assureFaceDb(task.getScenicId().toString());
AddFaceResp addFaceResp = faceBodyAdapter.addFace(task.getScenicId().toString(), newFaceSampleId.toString(), faceUrl, newFaceSampleId.toString());
if (addFaceResp != null) {
faceSample.setScore(addFaceResp.getScore());
faceSampleMapper.updateScore(faceSample.getId(), addFaceResp.getScore());
}
SourceEntity source = new SourceEntity();
source.setId(SnowFlakeUtil.getLongId());
source.setDeviceId(task.getDeviceId());
source.setScenicId(task.getScenicId());
source.setFaceSampleId(newFaceSampleId);
source.setCreateTime(task.getCreateTime());
source.setType(1);
source.setUrl(faceUrl);
source.setVideoUrl(storageAdapter.getUrl(task.getSavePath()));
sourceMapper.add(source);
});
System.out.println(JacksonUtil.toJson(persons));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}