package com.ycwl.basic.task; import cn.hutool.core.thread.ThreadFactoryBuilder; import com.ycwl.basic.biz.OrderBiz; import com.ycwl.basic.biz.TaskStatusBiz; import com.ycwl.basic.constant.StorageConstant; import com.ycwl.basic.device.DeviceFactory; import com.ycwl.basic.device.entity.common.FileObject; import com.ycwl.basic.device.operator.IDeviceStorageOperator; import com.ycwl.basic.integration.common.manager.DeviceConfigManager; import com.ycwl.basic.model.pc.face.entity.FaceEntity; import com.ycwl.basic.model.pc.faceSample.entity.FaceSampleEntity; import com.ycwl.basic.model.pc.scenic.entity.ScenicConfigEntity; import com.ycwl.basic.repository.DeviceRepository; import com.ycwl.basic.mapper.FaceSampleMapper; import com.ycwl.basic.mapper.SourceMapper; import com.ycwl.basic.model.mobile.order.IsBuyRespVO; import com.ycwl.basic.model.pc.device.entity.DeviceConfigEntity; import com.ycwl.basic.integration.device.service.DeviceIntegrationService; import com.ycwl.basic.integration.device.dto.device.DeviceV2DTO; import com.ycwl.basic.model.pc.device.entity.DeviceEntity; import com.ycwl.basic.model.pc.source.entity.MemberSourceEntity; import com.ycwl.basic.model.pc.source.entity.SourceEntity; import com.ycwl.basic.repository.FaceRepository; import com.ycwl.basic.repository.ScenicRepository; import com.ycwl.basic.repository.TemplateRepository; import com.ycwl.basic.storage.StorageFactory; import com.ycwl.basic.storage.adapters.IStorageAdapter; import com.ycwl.basic.utils.SnowFlakeUtil; import com.ycwl.basic.utils.VideoReUploader; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; @Component @EnableScheduling @Slf4j public class VideoPieceGetter { @Autowired private FaceSampleMapper faceSampleMapper; @Autowired private FaceRepository faceRepository; @Autowired private DeviceRepository deviceRepository; @Autowired private SourceMapper sourceMapper; @Autowired private OrderBiz orderBiz; @Autowired private TemplateRepository templateRepository; @Autowired private TaskStatusBiz taskStatusBiz; @Autowired private VideoReUploader videoReUploader; @Autowired private ScenicRepository scenicRepository; @Autowired private DeviceIntegrationService deviceIntegrationService; @Data public static class Task { public List faceSampleIds = new ArrayList<>(); public Callback callback; public Long memberId; public Long faceId; public Long templateId; public boolean force; public interface Callback { void onInvoke(); } } @Data public static class FfmpegTask { List fileList; BigDecimal duration; BigDecimal offsetStart; String outputFile; } public static ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); public static void addTask(Task task) { queue.add(task); } @Scheduled(fixedRate = 200L) public void doTask() { Task task = queue.poll(); if (task == null) { return; } log.info("poll task: {}/{}", task, queue.size()); Thread.ofVirtual().start(() -> { try { runTask(task); } catch (Exception e) { log.error("run task error", e); } }); } private void runTask(Task task) { List templatePlaceholder; if (null != task.getTemplateId()) { templatePlaceholder = templateRepository.getTemplatePlaceholder(task.getTemplateId()); } else { templatePlaceholder = null; } // 临时处理,只有逻辑层自己设置是否正在切片后再做更新 // if (task.faceId != null) { // taskStatusBiz.setFaceCutStatus(task.faceId, 0); // } AtomicBoolean invoke = new AtomicBoolean(false); final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNamePrefix("VPG-" + task.faceId + "-t") .build(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 128, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(128), threadFactory ); List currentUnFinPlaceholder = new ArrayList<>(); List list = faceSampleMapper.listByIds(task.getFaceSampleIds()); Map pairDeviceMap = new ConcurrentHashMap<>(); if (!list.isEmpty()) { Long scenicId = list.getFirst().getScenicId(); List allDeviceByScenicId = deviceRepository.getAllDeviceByScenicId(scenicId); allDeviceByScenicId.forEach(device -> { Long deviceId = device.getId(); DeviceConfigManager deviceConfig = deviceRepository.getDeviceConfigManager(deviceId); Long pairDevice = deviceConfig.getLong("pair_device"); if (pairDevice != null) { pairDeviceMap.putIfAbsent(deviceId, pairDevice); } }); } Map> collection = list.stream() .filter(faceSample -> { if (templatePlaceholder != null) { return templatePlaceholder.contains(faceSample.getDeviceId().toString()); } return true; }) .collect(Collectors.groupingBy(FaceSampleEntity::getDeviceId)); if (templatePlaceholder != null) { IntStream.range(0, templatePlaceholder.size()).forEach(i -> { currentUnFinPlaceholder.add(templatePlaceholder.get(i)); }); } else { collection.keySet().forEach(i -> { currentUnFinPlaceholder.add(i.toString()); }); } collection.values().forEach(faceSampleList -> { executor.execute(() -> { AtomicBoolean isFirst = new AtomicBoolean(true); faceSampleList.forEach(faceSample -> { if (!isFirst.get()) { try { Thread.sleep(1000); } catch (InterruptedException ignore) { } } isFirst.set(false); if (pairDeviceMap.containsValue(faceSample.getDeviceId())) { // 有关联设备! // 找到对应的deviceId pairDeviceMap.entrySet().stream() .filter(entry -> entry.getValue().equals(faceSample.getDeviceId())) .map(Map.Entry::getKey).forEach(pairDeviceId -> { log.info("找到同景区关联设备:{} -> {}", pairDeviceId, faceSample.getDeviceId()); if (pairDeviceId != null) { doCut(pairDeviceId, faceSample.getId(), faceSample.getCreateAt(), task); currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString()); } }); } doCut(faceSample.getDeviceId(), faceSample.getId(), faceSample.getCreateAt(), task); currentUnFinPlaceholder.remove(faceSample.getDeviceId().toString()); if (templatePlaceholder != null) { log.info("当前进度:!{}/{}", currentUnFinPlaceholder.size(), templatePlaceholder.size()); if (currentUnFinPlaceholder.isEmpty()) { if (!invoke.get()) { invoke.set(true); task.getCallback().onInvoke(); } } } }); }); }); try { Thread.sleep(1000L); log.info("executor等待被结束![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount()); executor.shutdown(); executor.awaitTermination(5, TimeUnit.MINUTES); log.info("executor已结束![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount()); executor.close(); } catch (InterruptedException e) { log.info("executor已中断![A:{}/T:{}/F:{}]", executor.getActiveCount(), executor.getTaskCount(), executor.getCompletedTaskCount()); } finally { if (task.faceId != null) { taskStatusBiz.setFaceCutStatus(task.faceId, 1); } } if (null != task.getCallback()) { if (!invoke.get()) { invoke.set(true); task.getCallback().onInvoke(); } } if (task.getFaceId() != null) { FaceEntity face = faceRepository.getFace(task.getFaceId()); if (face != null) { ScenicConfigEntity scenicConfig = scenicRepository.getScenicConfig(face.getScenicId()); if (scenicConfig != null) { // 免费送 List sourceEntities = sourceMapper.listByFaceRelation(face.getScenicId(), face.getId(), 1); if (sourceEntities.stream().noneMatch(item -> Integer.valueOf(1).equals(item.getIsFree()))) { List freeSourceRelationIds = new ArrayList<>(); if (scenicConfig.getVideoFreeNum() != null && scenicConfig.getVideoFreeNum() > 0) { if (scenicConfig.getVideoFreeNum() > sourceEntities.size()) { freeSourceRelationIds.addAll(sourceEntities.stream().map(MemberSourceEntity::getId).toList()); } else { freeSourceRelationIds.addAll(sourceEntities.stream().limit(scenicConfig.getVideoFreeNum()).map(MemberSourceEntity::getId).toList()); } } if (!freeSourceRelationIds.isEmpty()) { sourceMapper.freeRelations(freeSourceRelationIds, 1); } } } } } } private boolean doCut(Long deviceId, Long faceSampleId, Date baseTime, Task task) { // 通过zt-device服务获取设备信息 DeviceV2DTO deviceV2 = deviceIntegrationService.getDevice(deviceId); if (deviceV2 == null) { log.warn("设备不存在,设备ID: {}", deviceId); return false; } DeviceConfigManager config = deviceRepository.getDeviceConfigManager(deviceId); DeviceConfigEntity dConfig = deviceRepository.getDeviceConfig(deviceId); SourceEntity source = sourceMapper.querySameVideo(faceSampleId, deviceV2.getId()); if (source == null || task.force) { BigDecimal cutPre = config.getBigDecimal("cut_pre", BigDecimal.valueOf(5L)); BigDecimal cutPost = config.getBigDecimal("cut_post", BigDecimal.valueOf(5L)); IDeviceStorageOperator pieceGetter = DeviceFactory.getDeviceStorageOperator(deviceV2, dConfig); if (pieceGetter == null) { return false; } BigDecimal duration = cutPre.add(cutPost); List listByDtRange = pieceGetter.getFileListByDtRange( new Date(baseTime.getTime() - cutPre.multiply(BigDecimal.valueOf(1000)).longValue()), new Date(baseTime.getTime() + cutPost.multiply(BigDecimal.valueOf(1000)).longValue()) ); if (listByDtRange.isEmpty()) { log.warn("没有可用的文件"); return false; } log.info("查询到可用的文件: {}", listByDtRange); // 如果完全一致,就不需要裁切 String url; if (listByDtRange.size() == 1 && listByDtRange.getFirst().isExact()) { url = listByDtRange.getFirst().getUrl(); } else { long offset = baseTime.getTime() - cutPre.multiply(BigDecimal.valueOf(1000)).longValue() - listByDtRange.getFirst().getCreateTime().getTime(); FfmpegTask ffmpegTask = new FfmpegTask(); ffmpegTask.setFileList(listByDtRange); ffmpegTask.setDuration(duration); ffmpegTask.setOffsetStart(BigDecimal.valueOf(offset, 3)); File outFile = new File(deviceId.toString() + "_" + faceSampleId + ".mp4"); ffmpegTask.setOutputFile(outFile.getAbsolutePath()); boolean result = startFfmpegTask(ffmpegTask); if (!result) { log.warn("视频裁切失败"); return false; } log.info("视频裁切成功"); IStorageAdapter adapter = StorageFactory.use("assets"); url = adapter.uploadFile("video/mp4", outFile, StorageConstant.VIDEO_PIECE_PATH, outFile.getName()); // 上传成功后删除文件 outFile.delete(); } if (source == null) { SourceEntity imgSource = sourceMapper.findBySampleId(faceSampleId); SourceEntity sourceEntity = new SourceEntity(); sourceEntity.setId(SnowFlakeUtil.getLongId()); sourceEntity.setCreateTime(baseTime); if (imgSource != null) { sourceEntity.setUrl(imgSource.getUrl()); sourceEntity.setPosJson(imgSource.getPosJson()); } if (StringUtils.isNotBlank(config.getString("video_crop"))) { sourceEntity.setPosJson(config.getString("video_crop")); } sourceEntity.setVideoUrl(url); sourceEntity.setFaceSampleId(faceSampleId); sourceEntity.setScenicId(deviceV2.getScenicId()); sourceEntity.setDeviceId(deviceId); sourceEntity.setType(1); if (task.memberId != null && task.faceId != null) { MemberSourceEntity videoSource = new MemberSourceEntity(); videoSource.setMemberId(task.getMemberId()); videoSource.setType(1); videoSource.setFaceId(task.getFaceId()); videoSource.setScenicId(deviceV2.getScenicId()); videoSource.setSourceId(sourceEntity.getId()); IsBuyRespVO isBuy = orderBiz.isBuy(task.getMemberId(), deviceV2.getScenicId(), 1, task.getFaceId()); if (isBuy.isBuy()) { // 如果用户买过 videoSource.setIsBuy(1); } else if (isBuy.isFree()) { // 全免费逻辑 videoSource.setIsBuy(1); } else { videoSource.setIsBuy(0); } sourceMapper.addRelation(videoSource); } sourceMapper.add(sourceEntity); videoReUploader.addTask(sourceEntity.getId()); } else { source.setVideoUrl(url); if (StringUtils.isNotBlank(config.getString("video_crop"))) { source.setPosJson(config.getString("video_crop")); } sourceMapper.update(source); videoReUploader.addTask(source.getId()); } } else { // 有原视频 if (task.memberId != null && task.faceId != null) { int count = sourceMapper.hasRelationTo(task.getMemberId(), source.getId(), 1); if (count <= 0) { // 没有关联 IsBuyRespVO isBuy = orderBiz.isBuy(task.getMemberId(), deviceV2.getScenicId(), 1, task.getFaceId()); MemberSourceEntity videoSource = new MemberSourceEntity(); videoSource.setId(SnowFlakeUtil.getLongId()); videoSource.setScenicId(deviceV2.getScenicId()); videoSource.setFaceId(task.getFaceId()); videoSource.setMemberId(task.getMemberId()); videoSource.setType(1); if (isBuy.isBuy()) { // 如果用户买过 videoSource.setIsBuy(1); } else if (isBuy.isFree()) { // 全免费逻辑 videoSource.setIsBuy(1); } else { videoSource.setIsBuy(0); } videoSource.setSourceId(source.getId()); sourceMapper.addRelation(videoSource); } } } return true; } public boolean startFfmpegTask(FfmpegTask task) { boolean result; if (task.getFileList().size() == 1) { // 单个文件切割,用简单方法 result = runFfmpegForSingleFile(task); } else { // 多个文件切割,用速度快的 result = runFfmpegForMultipleFile1(task); } // 先尝试方法1 if (result) { return true; } log.warn("FFMPEG简易方法失败,尝试复杂方法转码"); // 不行再尝试方法二 return runFfmpegForMultipleFile2(task); } private boolean runFfmpegForMultipleFile1(FfmpegTask task) { // 多文件,方法一:先转换成ts,然后合并切割 // 步骤一:先转换成ts,并行转换 boolean notOk = task.getFileList().stream().map(file -> { try { if (file.isNeedDownload() || (!file.getName().endsWith(".ts"))) { String tmpFile = file.getName() + ".ts"; boolean result = convertMp4ToTs(file, tmpFile); // 因为是并行转换,没法保证顺序,就直接存里面 if (result) { file.setUrl(tmpFile); } else { // 失败了,务必删除临时文件 (new File(tmpFile)).delete(); } return result; } else { return true; } } catch (IOException e) { log.warn("转码出错"); return false; } }).anyMatch(b -> !b); // 转码进程中出现问题 if (notOk) { return false; } // 步骤二:使用concat协议拼接裁切 boolean result; try { result = quickVideoCut( "concat:" + task.getFileList().stream().map(FileObject::getUrl).collect(Collectors.joining("|")), task.getOffsetStart(), task.getDuration(), task.getOutputFile() ); } catch (IOException e) { return false; } // 步骤三:删除临时文件 task.getFileList().stream().map(FileObject::getUrl).forEach(tmpFile -> { File f = new File(tmpFile); if (f.exists() && f.isFile()) { f.delete(); } }); return result; } private boolean runFfmpegForMultipleFile2(FfmpegTask task) { // 多文件,方法二:使用计算资源编码 try { return slowVideoCut(task.getFileList(), task.getOffsetStart(), task.getDuration(), task.getOutputFile()); } catch (IOException e) { return false; } } private boolean runFfmpegForSingleFile(FfmpegTask task) { try { return quickVideoCut(task.getFileList().getFirst().getUrl(), task.getOffsetStart(), task.getDuration(), task.getOutputFile()); } catch (IOException e) { return false; } } /** * 把MP4转换成可以拼接的TS文件 * * @param file MP4文件,或ffmpeg支持的输入 * @param outFileName 输出文件路径 * @return 是否成功 * @throws IOException 奇奇怪怪的报错 */ private boolean convertMp4ToTs(FileObject file, String outFileName) throws IOException { List ffmpegCmd = new ArrayList<>(); ffmpegCmd.add("ffmpeg"); ffmpegCmd.add("-hide_banner"); ffmpegCmd.add("-y"); ffmpegCmd.add("-i"); ffmpegCmd.add(file.getUrl()); ffmpegCmd.add("-c"); ffmpegCmd.add("copy"); ffmpegCmd.add("-bsf:v"); ffmpegCmd.add("h264_mp4toannexb"); ffmpegCmd.add("-f"); ffmpegCmd.add("mpegts"); ffmpegCmd.add(outFileName); return handleFfmpegProcess(ffmpegCmd); } private boolean convertHevcToTs(FileObject file, String outFileName) throws IOException { List ffmpegCmd = new ArrayList<>(); ffmpegCmd.add("ffmpeg"); ffmpegCmd.add("-hide_banner"); ffmpegCmd.add("-y"); ffmpegCmd.add("-i"); ffmpegCmd.add(file.getUrl()); ffmpegCmd.add("-c"); ffmpegCmd.add("copy"); ffmpegCmd.add("-bsf:v"); ffmpegCmd.add("hevc_mp4toannexb"); ffmpegCmd.add("-f"); ffmpegCmd.add("mpegts"); ffmpegCmd.add(outFileName); return handleFfmpegProcess(ffmpegCmd); } /** * 快速切割,不产生转码,速度快,但可能会出现:第一帧数据不是I帧导致前面的数据无法使用 * * @param inputFile 输入文件,ffmpeg支持的协议均可 * @param offset 离输入文件开始的偏移 * @param length 输出文件时长 * @param outputFile 输出文件名称 * @return 是否成功 * @throws IOException 奇奇怪怪的报错 */ private boolean quickVideoCut(String inputFile, BigDecimal offset, BigDecimal length, String outputFile) throws IOException { List ffmpegCmd = new ArrayList<>(); ffmpegCmd.add("ffmpeg"); ffmpegCmd.add("-hide_banner"); ffmpegCmd.add("-y"); ffmpegCmd.add("-i"); ffmpegCmd.add(inputFile); ffmpegCmd.add("-c:v"); ffmpegCmd.add("copy"); ffmpegCmd.add("-an"); ffmpegCmd.add("-ss"); ffmpegCmd.add(offset.toPlainString()); ffmpegCmd.add("-t"); ffmpegCmd.add(length.toPlainString()); ffmpegCmd.add("-f"); ffmpegCmd.add("mp4"); ffmpegCmd.add(outputFile); return handleFfmpegProcess(ffmpegCmd); } /** * 转码切割,兜底逻辑,速度慢,但优势:成功后转码视频绝对可用 * * @param inputFiles 输入文件List,ffmpeg支持的协议均可 * @param offset 离输入文件开始的偏移 * @param length 输出文件时长 * @param outputFile 输出文件名称 * @return 是否成功 * @throws IOException 奇奇怪怪的报错 */ private boolean slowVideoCut(List inputFiles, BigDecimal offset, BigDecimal length, String outputFile) throws IOException { List ffmpegCmd = new ArrayList<>(); ffmpegCmd.add("ffmpeg"); ffmpegCmd.add("-hide_banner"); ffmpegCmd.add("-y"); for (FileObject file : inputFiles) { ffmpegCmd.add("-i"); ffmpegCmd.add(file.getUrl()); } // 使用filter_complex做拼接 ffmpegCmd.add("-filter_complex"); ffmpegCmd.add( IntStream.range(0, inputFiles.size()).mapToObj(i -> "[" + i + ":v]").collect(Collectors.joining("")) + "concat=n="+inputFiles.size()+":v=1[v]" ); ffmpegCmd.add("-map"); ffmpegCmd.add("[v]"); ffmpegCmd.add("-preset:v"); ffmpegCmd.add("fast"); ffmpegCmd.add("-an"); // 没有使用copy,因为使用了filter_complex ffmpegCmd.add("-ss"); ffmpegCmd.add(offset.toPlainString()); ffmpegCmd.add("-t"); ffmpegCmd.add(length.toPlainString()); ffmpegCmd.add("-f"); ffmpegCmd.add("mp4"); ffmpegCmd.add(outputFile); return handleFfmpegProcess(ffmpegCmd); } /** * 运行ffmpeg,并确认ffmpeg是否正常退出 * * @param ffmpegCmd ffmpeg命令 * @return 是否正常退出 */ private static boolean handleFfmpegProcess(List ffmpegCmd) throws IOException { Date _startDt = new Date(); log.info("FFMPEG执行命令:【{}】", String.join(" ", ffmpegCmd)); ProcessBuilder pb = new ProcessBuilder(ffmpegCmd); Process ffmpegProcess = pb.start(); // 如果需要额外分析输出之类 if (log.isTraceEnabled()) { InputStream stderr = ffmpegProcess.getErrorStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(stderr)); String line; while ((line = reader.readLine()) != null) { log.trace(line); } } try { // 最长1分钟 boolean exited = ffmpegProcess.waitFor(1, TimeUnit.MINUTES); if (exited) { int code = ffmpegProcess.exitValue(); Date _endDt = new Date(); log.info("FFMPEG执行命令结束,Code:【{}】,耗费时间:【{}ms】,命令:【{}】", code, _endDt.getTime() - _startDt.getTime(), String.join(" ", ffmpegCmd)); return 0 == code; } else { log.error("FFMPEG执行命令没有在1分钟内退出,命令:【{}】", String.join(" ", ffmpegCmd)); ffmpegProcess.destroy(); return false; } } catch (InterruptedException e) { // TODO: 被中断了 log.warn("FFMPEG执行命令:【{}】,被中断了", String.join(" ", ffmpegCmd)); return false; } } }