refactor(ffmpeg): 优化FFmpeg视频处理功能并增强错误处理

- 移除多余的导入包并将errors包添加到顶部导入列表
- 修复包导入顺序,将OpenTelemetry相关包移到底部
- 在任务克隆时深度复制文件列表以避免并发修改问题
- 添加临时文件清理逻辑,转码失败时删除已转换的临时文件
- 移除冗余的文件状态检查代码
- 修复排序比较函数中的相等条件,改为严格小于
- 将临时文件路径设置为系统临时目录下
- 使用context超时控制FFmpeg命令执行,替代手动超时管理
- 优化FFmpeg命令执行流程,使用Run方法统一处理进程启动和等待
- 添加更精确的超时和取消错误处理逻辑
This commit is contained in:
2026-02-27 18:40:30 +08:00
parent 1936c1a73a
commit a951517cfd

View File

@@ -5,10 +5,8 @@ import (
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"math/rand"
"os"
"os/exec"
@@ -18,6 +16,10 @@ import (
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)
const FfmpegExec = "ffmpeg"
@@ -67,8 +69,10 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
var wg sync.WaitGroup
var mu sync.Mutex
var notOk bool
clonedFiles := make([]dto.File, len(task.Files))
copy(clonedFiles, task.Files)
var taskClone = dto.FfmpegTask{
Files: task.Files,
Files: clonedFiles,
OutputFile: task.OutputFile,
Offset: task.Offset,
Length: task.Length,
@@ -114,6 +118,12 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
wg.Wait()
if notOk {
// 清理已成功转换的临时文件
for i := range taskClone.Files {
if taskClone.Files[i].Url != task.Files[i].Url {
os.Remove(taskClone.Files[i].Url)
}
}
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
return false
}
@@ -196,14 +206,6 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
return false
}
stat, err := os.Stat(task.OutputFile)
if err != nil {
span.SetStatus(codes.Error, "文件不存在")
logger.Error("文件不存在", zap.String("outputFile", task.OutputFile))
return false
}
span.SetAttributes(attribute.String("file.name", task.OutputFile))
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
if result {
outfile, err := os.Stat(task.OutputFile)
if err != nil {
@@ -241,7 +243,7 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
}
// 按照 Create 的值升序排序
sort.Slice(fileList, func(i, j int) bool {
return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix()
return fileList[i].StartTime.Unix() < fileList[j].StartTime.Unix()
})
// 如果片段在中间断开时间过长
@@ -347,7 +349,7 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
defer span.End()
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
tmpFile := path.Join(os.TempDir(), fmt.Sprintf("tmp%.10f.txt", rand.Float64()))
tmpFileObj, err := os.Create(tmpFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
@@ -435,48 +437,37 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
}()
logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
cmd := exec.CommandContext(timeoutCtx, ffmpegCmd[0], ffmpegCmd[1:]...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
err := cmd.Start()
err := cmd.Run()
if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) && ctx.Err() == nil {
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, fmt.Errorf("ffmpeg command timed out")
}
if ctx.Err() != nil {
span.SetStatus(codes.Error, "FFMPEG执行命令被取消")
logger.Warn("FFMPEG执行命令被取消", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, ctx.Err()
}
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err
}
defer cmd.Process.Kill()
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-time.After(1 * time.Minute):
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, fmt.Errorf("ffmpeg command timed out")
case err := <-done:
if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err
}
endTime := time.Now()
logger.Info("FFMPEG执行命令结束",
zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return true, nil
}
logger.Info("FFMPEG执行命令结束",
zap.Int64("durationMs", time.Since(startTime).Milliseconds()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return true, nil
}
func GetVideoCodec(ctx context.Context, filePath string) (string, error) {