You've already forked VptPassiveAdapter
474 lines
14 KiB
Go
474 lines
14 KiB
Go
package util
|
|
|
|
import (
|
|
"ZhenTuLocalPassiveAdapter/dto"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"log"
|
|
"math/rand"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const FfmpegExec = "ffmpeg"
|
|
const minimalSize = 8192
|
|
const minimalLengthRatio = 0.5
|
|
|
|
func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool {
|
|
subCtx, span := tracer.Start(ctx, "RunFfmpegTask")
|
|
defer span.End()
|
|
var result bool
|
|
if len(task.Files) == 1 {
|
|
// 单个文件切割,用简单方法
|
|
result = runFfmpegForSingleFile(subCtx, task)
|
|
} else {
|
|
// 多个文件切割,用速度快的
|
|
result = runFfmpegForMultipleFile1(subCtx, task)
|
|
}
|
|
// 先尝试方法1
|
|
if !result {
|
|
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
|
|
// 不行再尝试方法二
|
|
result = runFfmpegForMultipleFile2(subCtx, task)
|
|
}
|
|
duration, err := GetVideoDuration(subCtx, task.OutputFile)
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "获取视频时长失败")
|
|
return false
|
|
}
|
|
if duration < (minimalLengthRatio * float64(task.Length)) {
|
|
span.SetStatus(codes.Error, "视频长度不达标")
|
|
return false
|
|
}
|
|
if result {
|
|
span.SetStatus(codes.Ok, "成功")
|
|
return true
|
|
}
|
|
span.SetStatus(codes.Error, "失败")
|
|
return result
|
|
}
|
|
|
|
func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
|
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile1")
|
|
defer span.End()
|
|
// 多文件,方法一:先转换成ts,然后合并切割
|
|
// 步骤一:先转换成ts,并行转换
|
|
var wg sync.WaitGroup
|
|
var mu sync.Mutex
|
|
var notOk bool
|
|
var taskClone = dto.FfmpegTask{
|
|
Files: task.Files,
|
|
OutputFile: task.OutputFile,
|
|
Offset: task.Offset,
|
|
Length: task.Length,
|
|
}
|
|
|
|
for i := range taskClone.Files {
|
|
wg.Add(1)
|
|
go func(file *dto.File) {
|
|
defer wg.Done()
|
|
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
|
|
result, err := convertMp4ToTs(subCtx, *file, tmpFile)
|
|
if err != nil {
|
|
log.Printf("转码出错: %v", err)
|
|
mu.Lock()
|
|
notOk = true
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
if result {
|
|
mu.Lock()
|
|
file.Url = tmpFile
|
|
mu.Unlock()
|
|
} else {
|
|
// 失败了,务必删除临时文件
|
|
os.Remove(tmpFile)
|
|
}
|
|
}(&taskClone.Files[i])
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
if notOk {
|
|
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
|
|
return false
|
|
}
|
|
|
|
// 步骤二:使用concat协议拼接裁切
|
|
result, err := QuickConcatVideoCut(subCtx, taskClone.Files, taskClone.Offset, taskClone.Length, taskClone.OutputFile)
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败")
|
|
return false
|
|
}
|
|
|
|
// 步骤三:删除临时文件
|
|
for _, file := range taskClone.Files {
|
|
if err := os.Remove(file.Url); err != nil {
|
|
log.Printf("删除临时文件失败: %v", err)
|
|
}
|
|
}
|
|
if result {
|
|
outfile, err := os.Stat(taskClone.OutputFile)
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "文件不存在")
|
|
result = false
|
|
} else {
|
|
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
|
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
|
if outfile.Size() < minimalSize {
|
|
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
|
span.SetStatus(codes.Error, "文件大小过小")
|
|
result = false
|
|
}
|
|
}
|
|
}
|
|
if result {
|
|
span.SetStatus(codes.Ok, "FFMPEG多文件concat协议转码成功")
|
|
} else {
|
|
span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败")
|
|
}
|
|
return result
|
|
}
|
|
|
|
func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool {
|
|
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
|
|
defer span.End()
|
|
// 多文件,方法二:使用计算资源编码
|
|
result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Offset, task.OutputFile)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if result {
|
|
outfile, err := os.Stat(task.OutputFile)
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "文件不存在")
|
|
result = false
|
|
} else {
|
|
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
|
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
|
if outfile.Size() < minimalSize {
|
|
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
|
span.SetStatus(codes.Error, "文件大小过小")
|
|
result = false
|
|
}
|
|
}
|
|
}
|
|
if result {
|
|
span.SetStatus(codes.Ok, "FFMPEG多文件转码成功")
|
|
} else {
|
|
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
|
|
}
|
|
return result
|
|
}
|
|
|
|
func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
|
|
subCtx, span := tracer.Start(ctx, "runFfmpegForSingleFile")
|
|
defer span.End()
|
|
result, err := QuickVideoCut(subCtx, task.Files[0].Url, task.Offset, task.Length, task.OutputFile)
|
|
if err != nil {
|
|
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
|
|
return false
|
|
}
|
|
stat, err := os.Stat(task.OutputFile)
|
|
if err != nil {
|
|
span.SetStatus(codes.Error, "文件不存在")
|
|
log.Printf("文件不存在:%s", task.OutputFile)
|
|
return false
|
|
}
|
|
span.SetAttributes(attribute.String("file.name", task.OutputFile))
|
|
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
|
|
if result {
|
|
outfile, err := os.Stat(task.OutputFile)
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "文件不存在")
|
|
result = false
|
|
} else {
|
|
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
|
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
|
if outfile.Size() < minimalSize {
|
|
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
|
span.SetStatus(codes.Error, "文件大小过小")
|
|
result = false
|
|
}
|
|
}
|
|
}
|
|
if result {
|
|
span.SetStatus(codes.Ok, "FFMPEG单个文件裁切成功")
|
|
} else {
|
|
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
|
|
}
|
|
return result
|
|
}
|
|
|
|
func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) {
|
|
_, span := tracer.Start(ctx, "CheckFileCoverageAndConstructTask")
|
|
defer span.End()
|
|
if fileList == nil || len(fileList) == 0 {
|
|
span.SetStatus(codes.Error, "无法根据要求找到对应录制片段")
|
|
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt)
|
|
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
|
|
}
|
|
// 按照 Create 的值升序排序
|
|
sort.Slice(fileList, func(i, j int) bool {
|
|
return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix()
|
|
})
|
|
|
|
// 如果片段在中间断开时间过长
|
|
if len(fileList) > 1 {
|
|
var lastFile *dto.File
|
|
for _, file := range fileList {
|
|
if lastFile == nil {
|
|
lastFile = &file
|
|
continue
|
|
}
|
|
if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 {
|
|
// 片段断开
|
|
span.SetStatus(codes.Error, "FFMPEG片段断开")
|
|
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
|
|
return nil, fmt.Errorf("片段断开")
|
|
}
|
|
lastFile = &file
|
|
}
|
|
}
|
|
|
|
// 通过文件列表构造的任务仍然是缺失的
|
|
if fileList[len(fileList)-1].EndTime.Before(endDt) {
|
|
span.SetStatus(codes.Error, "FFMPEG片段断开")
|
|
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt)
|
|
return nil, fmt.Errorf("片段断开")
|
|
}
|
|
|
|
// 构造FfmpegTaskPo
|
|
ffmpegTask := &dto.FfmpegTask{
|
|
Files: fileList,
|
|
Length: endDt.Sub(beginDt).Seconds(),
|
|
Offset: beginDt.Sub(fileList[0].StartTime).Seconds(),
|
|
OutputFile: path.Join(os.TempDir(), task.TaskID+".mp4"),
|
|
}
|
|
span.SetAttributes(attribute.String("task.files", ToJson(ffmpegTask.Files)))
|
|
span.SetAttributes(attribute.Float64("task.offset", ffmpegTask.Offset))
|
|
span.SetAttributes(attribute.Float64("task.length", ffmpegTask.Length))
|
|
span.SetStatus(codes.Ok, "FFMPEG任务构造成功")
|
|
return ffmpegTask, nil
|
|
}
|
|
|
|
func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
|
|
subCtx, span := tracer.Start(ctx, "convertMp4ToTs")
|
|
defer span.End()
|
|
ffmpegCmd := []string{
|
|
FfmpegExec,
|
|
"-hide_banner",
|
|
"-y",
|
|
"-i", file.Url,
|
|
"-c", "copy",
|
|
"-bsf:v", "h264_mp4toannexb",
|
|
"-f", "mpegts",
|
|
outFileName,
|
|
}
|
|
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
|
}
|
|
|
|
func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
|
|
subCtx, span := tracer.Start(ctx, "convertHevcToTs")
|
|
defer span.End()
|
|
ffmpegCmd := []string{
|
|
FfmpegExec,
|
|
"-hide_banner",
|
|
"-y",
|
|
"-i", file.Url,
|
|
"-c", "copy",
|
|
"-bsf:v", "hevc_mp4toannexb",
|
|
"-f", "mpegts",
|
|
outFileName,
|
|
}
|
|
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
|
}
|
|
|
|
func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64, outputFile string) (bool, error) {
|
|
subCtx, span := tracer.Start(ctx, "QuickVideoCut")
|
|
defer span.End()
|
|
ffmpegCmd := []string{
|
|
FfmpegExec,
|
|
"-hide_banner",
|
|
"-y",
|
|
"-i", inputFile,
|
|
"-c:v", "copy",
|
|
"-an",
|
|
"-reset_timestamps", "1",
|
|
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
|
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
|
"-fflags", "+genpts",
|
|
"-f", "mp4",
|
|
outputFile,
|
|
}
|
|
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
|
}
|
|
|
|
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
|
|
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
|
|
defer span.End()
|
|
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
|
|
tmpFileObj, err := os.Create(tmpFile)
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "创建临时文件失败")
|
|
log.Printf("创建临时文件失败:%s", tmpFile)
|
|
return false, err
|
|
}
|
|
defer os.Remove(tmpFile)
|
|
defer tmpFileObj.Close()
|
|
|
|
for _, filePo := range inputFiles {
|
|
_, err := tmpFileObj.WriteString(fmt.Sprintf("file '%s'\n", filePo.Url))
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "写入临时文件失败")
|
|
log.Printf("写入临时文件失败:%s", tmpFile)
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
ffmpegCmd := []string{
|
|
FfmpegExec,
|
|
"-hide_banner",
|
|
"-y",
|
|
"-f", "concat",
|
|
"-safe", "0",
|
|
"-i", tmpFile,
|
|
"-c:v", "copy",
|
|
"-an",
|
|
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
|
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
|
"-f", "mp4",
|
|
outputFile,
|
|
}
|
|
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
|
}
|
|
|
|
func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
|
|
subCtx, span := tracer.Start(ctx, "SlowVideoCut")
|
|
defer span.End()
|
|
ffmpegCmd := []string{
|
|
FfmpegExec,
|
|
"-hide_banner",
|
|
"-y",
|
|
}
|
|
|
|
for _, file := range inputFiles {
|
|
ffmpegCmd = append(ffmpegCmd, "-i", file.Url)
|
|
}
|
|
|
|
inputCount := len(inputFiles)
|
|
filterComplex := strings.Builder{}
|
|
for i := 0; i < inputCount; i++ {
|
|
filterComplex.WriteString(fmt.Sprintf("[%d:v]", i))
|
|
}
|
|
filterComplex.WriteString(fmt.Sprintf("concat=n=%d:v=1[v]", inputCount))
|
|
|
|
ffmpegCmd = append(ffmpegCmd,
|
|
"-filter_complex", filterComplex.String(),
|
|
"-map", "[v]",
|
|
"-preset:v", "fast",
|
|
"-an",
|
|
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
|
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
|
"-f", "mp4",
|
|
outputFile,
|
|
)
|
|
|
|
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
|
}
|
|
|
|
func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) {
|
|
_, span := tracer.Start(ctx, "handleFfmpegProcess")
|
|
defer span.End()
|
|
span.SetAttributes(attribute.String("ffmpeg.cmd", ToJson(ffmpegCmd)))
|
|
startTime := time.Now()
|
|
defer func() {
|
|
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
|
|
}()
|
|
log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " "))
|
|
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
|
|
|
|
var stderr bytes.Buffer
|
|
cmd.Stderr = &stderr
|
|
|
|
err := cmd.Start()
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
|
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
|
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
|
return false, err
|
|
}
|
|
defer cmd.Process.Kill()
|
|
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- cmd.Wait()
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(1 * time.Minute):
|
|
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
|
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
|
|
log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " "))
|
|
return false, fmt.Errorf("ffmpeg command timed out")
|
|
case err := <-done:
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
|
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
|
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
|
return false, err
|
|
}
|
|
endTime := time.Now()
|
|
log.Printf("FFMPEG执行命令结束,耗费时间:【%dms】,命令:【%s】", endTime.Sub(startTime).Milliseconds(), strings.Join(ffmpegCmd, " "))
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
|
|
_, span := tracer.Start(ctx, "GetVideoDuration")
|
|
defer span.End()
|
|
ffprobeCmd := []string{
|
|
"ffprobe",
|
|
"-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
filePath,
|
|
}
|
|
span.SetAttributes(attribute.String("ffprobe.cmd", ToJson(ffprobeCmd)))
|
|
cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...)
|
|
var out bytes.Buffer
|
|
cmd.Stdout = &out
|
|
|
|
err := cmd.Run()
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
span.SetStatus(codes.Error, "failed to get video duration")
|
|
return 0, fmt.Errorf("failed to get video duration: %w", err)
|
|
}
|
|
span.SetAttributes(attribute.String("ffprobe.stdout", out.String()))
|
|
durationStr := strings.TrimSpace(out.String())
|
|
duration, err := strconv.ParseFloat(durationStr, 64)
|
|
if err != nil {
|
|
span.SetAttributes(attribute.String("error", err.Error()))
|
|
return 0, fmt.Errorf("failed to parse video duration: %w", err)
|
|
}
|
|
span.SetAttributes(attribute.Float64("video.duration", duration))
|
|
return duration, nil
|
|
}
|