diff --git a/util/ffmpeg.go b/util/ffmpeg.go index 603a078..e2b311a 100644 --- a/util/ffmpeg.go +++ b/util/ffmpeg.go @@ -5,10 +5,8 @@ import ( "ZhenTuLocalPassiveAdapter/logger" "bytes" "context" + "errors" "fmt" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.uber.org/zap" "math/rand" "os" "os/exec" @@ -18,6 +16,10 @@ import ( "strings" "sync" "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.uber.org/zap" ) const FfmpegExec = "ffmpeg" @@ -67,8 +69,10 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool { var wg sync.WaitGroup var mu sync.Mutex var notOk bool + clonedFiles := make([]dto.File, len(task.Files)) + copy(clonedFiles, task.Files) var taskClone = dto.FfmpegTask{ - Files: task.Files, + Files: clonedFiles, OutputFile: task.OutputFile, Offset: task.Offset, Length: task.Length, @@ -114,6 +118,12 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool { wg.Wait() if notOk { + // 清理已成功转换的临时文件 + for i := range taskClone.Files { + if taskClone.Files[i].Url != task.Files[i].Url { + os.Remove(taskClone.Files[i].Url) + } + } span.SetStatus(codes.Error, "FFMPEG多文件转码失败") return false } @@ -196,14 +206,6 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool { span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败") return false } - stat, err := os.Stat(task.OutputFile) - if err != nil { - span.SetStatus(codes.Error, "文件不存在") - logger.Error("文件不存在", zap.String("outputFile", task.OutputFile)) - return false - } - span.SetAttributes(attribute.String("file.name", task.OutputFile)) - span.SetAttributes(attribute.Int64("file.size", stat.Size())) if result { outfile, err := os.Stat(task.OutputFile) if err != nil { @@ -241,7 +243,7 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File, } // 按照 Create 的值升序排序 sort.Slice(fileList, func(i, j int) bool { - return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix() + return fileList[i].StartTime.Unix() < fileList[j].StartTime.Unix() }) // 如果片段在中间断开时间过长 @@ -347,7 +349,7 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64 func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) { subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut") defer span.End() - tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64()) + tmpFile := path.Join(os.TempDir(), fmt.Sprintf("tmp%.10f.txt", rand.Float64())) tmpFileObj, err := os.Create(tmpFile) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) @@ -435,48 +437,37 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds()))) }() logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " "))) - cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...) + + timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + cmd := exec.CommandContext(timeoutCtx, ffmpegCmd[0], ffmpegCmd[1:]...) var stderr bytes.Buffer cmd.Stderr = &stderr - err := cmd.Start() + err := cmd.Run() if err != nil { span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) + if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) && ctx.Err() == nil { + span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出") + logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " "))) + return false, fmt.Errorf("ffmpeg command timed out") + } + if ctx.Err() != nil { + span.SetStatus(codes.Error, "FFMPEG执行命令被取消") + logger.Warn("FFMPEG执行命令被取消", zap.String("command", strings.Join(ffmpegCmd, " "))) + return false, ctx.Err() + } span.SetStatus(codes.Error, "FFMPEG执行命令失败") logger.Error("FFMPEG执行命令失败", zap.String("error", stderr.String()), zap.String("command", strings.Join(ffmpegCmd, " "))) return false, err } - defer cmd.Process.Kill() - - done := make(chan error, 1) - go func() { - done <- cmd.Wait() - }() - - select { - case <-time.After(1 * time.Minute): - span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) - span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出") - logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " "))) - return false, fmt.Errorf("ffmpeg command timed out") - case err := <-done: - if err != nil { - span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) - span.SetStatus(codes.Error, "FFMPEG执行命令失败") - logger.Error("FFMPEG执行命令失败", - zap.String("error", stderr.String()), - zap.String("command", strings.Join(ffmpegCmd, " "))) - return false, err - } - endTime := time.Now() - logger.Info("FFMPEG执行命令结束", - zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()), - zap.String("command", strings.Join(ffmpegCmd, " "))) - return true, nil - } + logger.Info("FFMPEG执行命令结束", + zap.Int64("durationMs", time.Since(startTime).Milliseconds()), + zap.String("command", strings.Join(ffmpegCmd, " "))) + return true, nil } func GetVideoCodec(ctx context.Context, filePath string) (string, error) {