package util import ( "ZhenTuLocalPassiveAdapter/dto" "bytes" "context" "fmt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "log" "math/rand" "os" "os/exec" "path" "sort" "strconv" "strings" "sync" "time" ) const FfmpegExec = "ffmpeg" const minimalSize = 8192 const minimalLengthRatio = 0.5 func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool { subCtx, span := tracer.Start(ctx, "RunFfmpegTask") defer span.End() var result bool if len(task.Files) == 1 { // 单个文件切割,用简单方法 result = runFfmpegForSingleFile(subCtx, task) } else { // 多个文件切割,用速度快的 result = runFfmpegForMultipleFile1(subCtx, task) } // 先尝试方法1 if !result { log.Printf("FFMPEG简易方法失败,尝试复杂方法转码") // 不行再尝试方法二 result = runFfmpegForMultipleFile2(subCtx, task) } duration, err := GetVideoDuration(subCtx, task.OutputFile) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "获取视频时长失败") return false } if duration < (minimalLengthRatio * float64(task.Length)) { span.SetStatus(codes.Error, "视频长度不达标") return false } if result { span.SetStatus(codes.Ok, "成功") return true } span.SetStatus(codes.Error, "失败") return result } func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool { subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile1") defer span.End() // 多文件,方法一:先转换成ts,然后合并切割 // 步骤一:先转换成ts,并行转换 var wg sync.WaitGroup var mu sync.Mutex var notOk bool var taskClone = dto.FfmpegTask{ Files: task.Files, OutputFile: task.OutputFile, Offset: task.Offset, Length: task.Length, } for i := range taskClone.Files { wg.Add(1) go func(file *dto.File) { defer wg.Done() tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts") result, err := convertMp4ToTs(subCtx, *file, tmpFile) if err != nil { log.Printf("转码出错: %v", err) mu.Lock() notOk = true mu.Unlock() return } if result { mu.Lock() file.Url = tmpFile mu.Unlock() } else { // 失败了,务必删除临时文件 os.Remove(tmpFile) } }(&taskClone.Files[i]) } wg.Wait() if notOk { span.SetStatus(codes.Error, "FFMPEG多文件转码失败") return false } // 步骤二:使用concat协议拼接裁切 result, err := QuickConcatVideoCut(subCtx, taskClone.Files, taskClone.Offset, taskClone.Length, taskClone.OutputFile) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败") return false } // 步骤三:删除临时文件 for _, file := range taskClone.Files { if err := os.Remove(file.Url); err != nil { log.Printf("删除临时文件失败: %v", err) } } if result { outfile, err := os.Stat(taskClone.OutputFile) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "文件不存在") result = false } else { span.SetAttributes(attribute.String("file.name", outfile.Name())) span.SetAttributes(attribute.Int64("file.size", outfile.Size())) if outfile.Size() < minimalSize { span.SetAttributes(attribute.String("error", "文件大小过小")) span.SetStatus(codes.Error, "文件大小过小") result = false } } } if result { span.SetStatus(codes.Ok, "FFMPEG多文件concat协议转码成功") } else { span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败") } return result } func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool { subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2") defer span.End() // 多文件,方法二:使用计算资源编码 result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Offset, task.OutputFile) if err != nil { return false } if result { outfile, err := os.Stat(task.OutputFile) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "文件不存在") result = false } else { span.SetAttributes(attribute.String("file.name", outfile.Name())) span.SetAttributes(attribute.Int64("file.size", outfile.Size())) if outfile.Size() < minimalSize { span.SetAttributes(attribute.String("error", "文件大小过小")) span.SetStatus(codes.Error, "文件大小过小") result = false } } } if result { span.SetStatus(codes.Ok, "FFMPEG多文件转码成功") } else { span.SetStatus(codes.Error, "FFMPEG多文件转码失败") } return result } func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool { subCtx, span := tracer.Start(ctx, "runFfmpegForSingleFile") defer span.End() result, err := QuickVideoCut(subCtx, task.Files[0].Url, task.Offset, task.Length, task.OutputFile) if err != nil { span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败") return false } stat, err := os.Stat(task.OutputFile) if err != nil { span.SetStatus(codes.Error, "文件不存在") log.Printf("文件不存在:%s", task.OutputFile) return false } span.SetAttributes(attribute.String("file.name", task.OutputFile)) span.SetAttributes(attribute.Int64("file.size", stat.Size())) if result { outfile, err := os.Stat(task.OutputFile) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "文件不存在") result = false } else { span.SetAttributes(attribute.String("file.name", outfile.Name())) span.SetAttributes(attribute.Int64("file.size", outfile.Size())) if outfile.Size() < minimalSize { span.SetAttributes(attribute.String("error", "文件大小过小")) span.SetStatus(codes.Error, "文件大小过小") result = false } } } if result { span.SetStatus(codes.Ok, "FFMPEG单个文件裁切成功") } else { span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败") } return result } func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) { _, span := tracer.Start(ctx, "CheckFileCoverageAndConstructTask") defer span.End() if fileList == nil || len(fileList) == 0 { span.SetStatus(codes.Error, "无法根据要求找到对应录制片段") log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt) return nil, fmt.Errorf("无法根据要求找到对应录制片段") } // 按照 Create 的值升序排序 sort.Slice(fileList, func(i, j int) bool { return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix() }) // 如果片段在中间断开时间过长 if len(fileList) > 1 { var lastFile *dto.File for _, file := range fileList { if lastFile == nil { lastFile = &file continue } if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 { // 片段断开 span.SetStatus(codes.Error, "FFMPEG片段断开") log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds()) return nil, fmt.Errorf("片段断开") } lastFile = &file } } // 通过文件列表构造的任务仍然是缺失的 if fileList[len(fileList)-1].EndTime.Before(endDt) { span.SetStatus(codes.Error, "FFMPEG片段断开") log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt) return nil, fmt.Errorf("片段断开") } // 构造FfmpegTaskPo ffmpegTask := &dto.FfmpegTask{ Files: fileList, Length: endDt.Sub(beginDt).Seconds(), Offset: beginDt.Sub(fileList[0].StartTime).Seconds(), OutputFile: path.Join(os.TempDir(), task.TaskID+".mp4"), } span.SetAttributes(attribute.String("task.files", ToJson(ffmpegTask.Files))) span.SetAttributes(attribute.Float64("task.offset", ffmpegTask.Offset)) span.SetAttributes(attribute.Float64("task.length", ffmpegTask.Length)) span.SetStatus(codes.Ok, "FFMPEG任务构造成功") return ffmpegTask, nil } func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) { subCtx, span := tracer.Start(ctx, "convertMp4ToTs") defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", "-y", "-i", file.Url, "-c", "copy", "-bsf:v", "h264_mp4toannexb", "-f", "mpegts", outFileName, } return handleFfmpegProcess(subCtx, ffmpegCmd) } func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) { subCtx, span := tracer.Start(ctx, "convertHevcToTs") defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", "-y", "-i", file.Url, "-c", "copy", "-bsf:v", "hevc_mp4toannexb", "-f", "mpegts", outFileName, } return handleFfmpegProcess(subCtx, ffmpegCmd) } func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64, outputFile string) (bool, error) { subCtx, span := tracer.Start(ctx, "QuickVideoCut") defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", "-y", "-i", inputFile, "-c:v", "copy", "-an", "-reset_timestamps", "1", "-ss", strconv.FormatFloat(offset, 'f', 2, 64), "-t", strconv.FormatFloat(length, 'f', 2, 64), "-fflags", "+genpts", "-f", "mp4", outputFile, } return handleFfmpegProcess(subCtx, ffmpegCmd) } func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) { subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut") defer span.End() tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64()) tmpFileObj, err := os.Create(tmpFile) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "创建临时文件失败") log.Printf("创建临时文件失败:%s", tmpFile) return false, err } defer os.Remove(tmpFile) defer tmpFileObj.Close() for _, filePo := range inputFiles { _, err := tmpFileObj.WriteString(fmt.Sprintf("file '%s'\n", filePo.Url)) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "写入临时文件失败") log.Printf("写入临时文件失败:%s", tmpFile) return false, err } } ffmpegCmd := []string{ FfmpegExec, "-hide_banner", "-y", "-f", "concat", "-safe", "0", "-i", tmpFile, "-c:v", "copy", "-an", "-ss", strconv.FormatFloat(offset, 'f', 2, 64), "-t", strconv.FormatFloat(length, 'f', 2, 64), "-f", "mp4", outputFile, } return handleFfmpegProcess(subCtx, ffmpegCmd) } func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) { subCtx, span := tracer.Start(ctx, "SlowVideoCut") defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", "-y", } for _, file := range inputFiles { ffmpegCmd = append(ffmpegCmd, "-i", file.Url) } inputCount := len(inputFiles) filterComplex := strings.Builder{} for i := 0; i < inputCount; i++ { filterComplex.WriteString(fmt.Sprintf("[%d:v]", i)) } filterComplex.WriteString(fmt.Sprintf("concat=n=%d:v=1[v]", inputCount)) ffmpegCmd = append(ffmpegCmd, "-filter_complex", filterComplex.String(), "-map", "[v]", "-preset:v", "fast", "-an", "-ss", strconv.FormatFloat(offset, 'f', 2, 64), "-t", strconv.FormatFloat(length, 'f', 2, 64), "-f", "mp4", outputFile, ) return handleFfmpegProcess(subCtx, ffmpegCmd) } func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) { _, span := tracer.Start(ctx, "handleFfmpegProcess") defer span.End() span.SetAttributes(attribute.String("ffmpeg.cmd", ToJson(ffmpegCmd))) startTime := time.Now() defer func() { span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds()))) }() log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " ")) cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...) var stderr bytes.Buffer cmd.Stderr = &stderr err := cmd.Start() if err != nil { span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetStatus(codes.Error, "FFMPEG执行命令失败") log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) return false, err } defer cmd.Process.Kill() done := make(chan error, 1) go func() { done <- cmd.Wait() }() select { case <-time.After(1 * time.Minute): span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出") log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " ")) return false, fmt.Errorf("ffmpeg command timed out") case err := <-done: if err != nil { span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetStatus(codes.Error, "FFMPEG执行命令失败") log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) return false, err } endTime := time.Now() log.Printf("FFMPEG执行命令结束,耗费时间:【%dms】,命令:【%s】", endTime.Sub(startTime).Milliseconds(), strings.Join(ffmpegCmd, " ")) return true, nil } } func GetVideoDuration(ctx context.Context, filePath string) (float64, error) { _, span := tracer.Start(ctx, "GetVideoDuration") defer span.End() ffprobeCmd := []string{ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", filePath, } span.SetAttributes(attribute.String("ffprobe.cmd", ToJson(ffprobeCmd))) cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...) var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "failed to get video duration") return 0, fmt.Errorf("failed to get video duration: %w", err) } span.SetAttributes(attribute.String("ffprobe.stdout", out.String())) durationStr := strings.TrimSpace(out.String()) duration, err := strconv.ParseFloat(durationStr, 64) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) return 0, fmt.Errorf("failed to parse video duration: %w", err) } span.SetAttributes(attribute.Float64("video.duration", duration)) return duration, nil }