Compare commits

..

11 Commits

Author SHA1 Message Date
4067ab8d0a hwaccel qsvL use vpp 2026-03-13 11:38:18 +08:00
f1634fcecc hwaccel qsv 2026-03-13 11:29:18 +08:00
fe09c60822 hwaccel 2026-03-13 11:10:39 +08:00
83bfe34394 config p 2026-03-13 00:06:39 +08:00
5972ba1948 common p 2026-03-13 00:04:04 +08:00
030d8e9e6d 720p 2026-03-12 15:56:06 +08:00
f8f0e92723 id string 2026-03-10 15:49:03 +08:00
181a6b5368 fix(video): 修复视频剪辑时间戳处理问题
- 调整 -ss 参数位置以确保正确的剪辑起始时间
- 移除多余的 -reset_timestamps 参数避免时间戳重置问题
- 确保 -fflags +genpts 参数在正确的命令序列中设置
- 修复拼接操作中的时间偏移参数顺序
- 优化视频处理流程中的时间戳生成逻辑
2026-03-06 17:31:12 +08:00
a951517cfd refactor(ffmpeg): 优化FFmpeg视频处理功能并增强错误处理
- 移除多余的导入包并将errors包添加到顶部导入列表
- 修复包导入顺序,将OpenTelemetry相关包移到底部
- 在任务克隆时深度复制文件列表以避免并发修改问题
- 添加临时文件清理逻辑,转码失败时删除已转换的临时文件
- 移除冗余的文件状态检查代码
- 修复排序比较函数中的相等条件,改为严格小于
- 将临时文件路径设置为系统临时目录下
- 使用context超时控制FFmpeg命令执行,替代手动超时管理
- 优化FFmpeg命令执行流程,使用Run方法统一处理进程启动和等待
- 添加更精确的超时和取消错误处理逻辑
2026-02-27 18:40:30 +08:00
1936c1a73a feat(video): 支持HEVC编码视频转换为TS格式
- 添加GetVideoCodec函数用于检测视频编码格式
- 实现HEVC编码视频的特殊转换逻辑
- 引入convertHevcToTs函数处理HEVC编码视频
- 保持原有MP4格式转换功能的兼容性
- 添加错误处理和日志记录机制
- 集成链路追踪支持视频编解码操作监控
2026-02-27 17:52:11 +08:00
72b8d277ea fix(video): 修复视频剪辑功能中的参数错误和音频处理问题
- 修正 SlowVideoCut 函数中传入的参数,将第二个 task.Offset 替换为 task.Length
- 添加静音音频源输入以解决无音频轨道的视频处理问题
- 配置音频编解码器为 aac 格式并启用最短时长截取
- 在多个视频处理函数中统一音频处理逻辑
- 修复 concat 方式视频剪辑的音频映射问题
- 确保视频剪辑操作保留正确的音频流处理
2026-02-27 17:50:50 +08:00
7 changed files with 383 additions and 53 deletions

View File

@@ -75,3 +75,17 @@ func OssUpload(ctx context.Context, url, filePath string) error {
span.SetStatus(codes.Ok, "上传成功") span.SetStatus(codes.Ok, "上传成功")
return nil return nil
} }
func UploadPreviewFile(ctx context.Context, taskId string, previewFilePath string, resolution string) error {
subCtx, span := tracer.Start(ctx, "UploadPreviewFile")
defer span.End()
url, err := QueryPreviewUploadUrl(subCtx, taskId, resolution)
if err != nil {
return err
}
logger.Info("开始上传预览文件", zap.String("url", url), zap.String("resolution", resolution))
if err := OssUpload(subCtx, url, previewFilePath); err != nil {
return err
}
return nil
}

View File

@@ -126,3 +126,71 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
return false return false
} }
} }
func QueryPreviewUploadUrl(ctx context.Context, taskId string, resolution string) (string, error) {
_, span := tracer.Start(ctx, "QueryPreviewUploadUrl")
defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewUploadUrl?resolution=" + resolution
span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "GET"))
req, err := http.NewRequest("GET", url, nil)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
logger.Error("创建请求失败", zap.Error(err))
return "", err
}
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
logger.Error("发送请求失败", zap.Error(err))
return "", err
}
span.SetAttributes(attribute.String("http.status", resp.Status))
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "读取响应体失败")
logger.Error("读取响应体失败", zap.Error(err))
return "", err
}
return string(body), nil
}
func ReportPreviewSuccess(ctx context.Context, taskId string, resolution string) bool {
_, span := tracer.Start(ctx, "ReportPreviewSuccess")
defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewSuccess?resolution=" + resolution
span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "POST"))
req, err := http.NewRequest("POST", url, nil)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
logger.Error("创建请求失败", zap.Error(err))
return false
}
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
logger.Error("发送请求失败", zap.Error(err))
return false
}
defer resp.Body.Close()
span.SetAttributes(attribute.String("http.status", resp.Status))
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
if resp.StatusCode == 200 {
span.SetStatus(codes.Ok, "成功")
return true
} else {
span.SetStatus(codes.Error, "失败")
return false
}
}

View File

@@ -121,7 +121,7 @@ func (c *ImageProcessingConfig) GetEffectiveConfig() *ImageProcessingConfig {
} }
type UploadConfig struct { type UploadConfig struct {
TaskID int64 `json:"taskId"` TaskID int64 `json:"taskId,string"`
FaceUploadURL string `json:"faceUploadUrl"` FaceUploadURL string `json:"faceUploadUrl"`
ThumbnailUploadURL string `json:"thumbnailUploadUrl"` ThumbnailUploadURL string `json:"thumbnailUploadUrl"`
SourceUploadURL string `json:"sourceUploadUrl"` SourceUploadURL string `json:"sourceUploadUrl"`
@@ -147,12 +147,12 @@ type FacePositionInfo struct {
} }
type SubmitResultRequest struct { type SubmitResultRequest struct {
TaskID int64 `json:"taskId"` TaskID int64 `json:"taskId,string"`
FacePosition FacePositionInfo `json:"facePosition"` FacePosition FacePositionInfo `json:"facePosition"`
} }
type SubmitFailureRequest struct { type SubmitFailureRequest struct {
TaskID int64 `json:"taskId"` TaskID int64 `json:"taskId,string"`
ErrorCode string `json:"errorCode"` ErrorCode string `json:"errorCode"`
ErrorMessage string `json:"errorMessage"` ErrorMessage string `json:"errorMessage"`
} }

View File

@@ -32,3 +32,9 @@ disconnectAction:
enabled: false enabled: false
thresholdMinutes: 5 thresholdMinutes: 5
command: "" command: ""
preview:
enabled: true
hwaccel: ""
resolutions:
- 720
- 1080

View File

@@ -64,6 +64,12 @@ type DisconnectActionConfig struct {
Command string `mapstructure:"command"` Command string `mapstructure:"command"`
} }
type PreviewConfig struct {
Enabled bool `mapstructure:"enabled"`
Resolutions []int `mapstructure:"resolutions"`
HwAccel string `mapstructure:"hwaccel"`
}
type MainConfig struct { type MainConfig struct {
Api ApiConfig `mapstructure:"api"` Api ApiConfig `mapstructure:"api"`
Record RecordConfig `mapstructure:"record"` Record RecordConfig `mapstructure:"record"`
@@ -71,4 +77,5 @@ type MainConfig struct {
FileName FileNameConfig `mapstructure:"fileName"` FileName FileNameConfig `mapstructure:"fileName"`
Viid ViidConfig `mapstructure:"viid"` Viid ViidConfig `mapstructure:"viid"`
DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"` DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"`
Preview PreviewConfig `mapstructure:"preview"`
} }

84
main.go
View File

@@ -7,13 +7,16 @@ import (
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/telemetry" "ZhenTuLocalPassiveAdapter/telemetry"
"ZhenTuLocalPassiveAdapter/util"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
"runtime" "runtime"
"strings"
"syscall" "syscall"
"time" "time"
@@ -48,6 +51,20 @@ func startTask(device config.DeviceMapping, task dto.Task) {
logger.Info("处理任务成功", logger.Info("处理任务成功",
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo)) zap.String("deviceNo", task.DeviceNo))
// 复制主文件用于异步生成预览,避免阻塞主上传流程
previewEnabled := config.Config.Preview.Enabled && len(config.Config.Preview.Resolutions) > 0
copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4"
var copyErr error
if previewEnabled {
copyErr = copyFileOnDisk(fo.URL, copyFile)
if copyErr != nil {
logger.Warn("复制文件用于预览失败",
zap.String("taskID", task.TaskID),
zap.Error(copyErr))
}
}
err = api.UploadTaskFile(ctx, task, *fo) err = api.UploadTaskFile(ctx, task, *fo)
if err != nil { if err != nil {
span.SetStatus(codes.Error, "上传文件失败") span.SetStatus(codes.Error, "上传文件失败")
@@ -56,6 +73,9 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("deviceNo", task.DeviceNo), zap.String("deviceNo", task.DeviceNo),
zap.Error(err)) zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID) api.ReportTaskFailure(ctx, task.TaskID)
if previewEnabled && copyErr == nil {
os.Remove(copyFile)
}
return return
} }
result := api.ReportTaskSuccess(ctx, task.TaskID, fo) result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
@@ -65,12 +85,76 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo), zap.String("deviceNo", task.DeviceNo),
zap.Error(err)) zap.Error(err))
if previewEnabled && copyErr == nil {
os.Remove(copyFile)
}
return return
} }
span.SetStatus(codes.Ok, "上传文件成功") span.SetStatus(codes.Ok, "上传文件成功")
logger.Info("上传文件成功", logger.Info("上传文件成功",
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo)) zap.String("deviceNo", task.DeviceNo))
// 异步:从副本压缩预览 → 上传 → 上报
if previewEnabled && copyErr == nil {
go uploadPreview(task.TaskID, copyFile)
}
}
func uploadPreview(taskID string, copyFile string) {
ctx, span := tracer.Start(context.Background(), "uploadPreview")
defer span.End()
defer os.Remove(copyFile)
baseName := strings.TrimSuffix(copyFile, "_copy.mp4")
for _, height := range config.Config.Preview.Resolutions {
resolution := fmt.Sprintf("%dp", height)
previewFile := fmt.Sprintf("%s_preview_%s.mp4", baseName, resolution)
ok, _ := util.CompressVideo(ctx, copyFile, previewFile, height)
if !ok {
logger.Error("生成预览文件失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
continue
}
err := api.UploadPreviewFile(ctx, taskID, previewFile, resolution)
if err != nil {
logger.Error("上传预览文件失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution),
zap.Error(err))
os.Remove(previewFile)
continue
}
if !api.ReportPreviewSuccess(ctx, taskID, resolution) {
logger.Error("上报预览成功失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
continue
}
logger.Info("预览上传成功",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
}
span.SetStatus(codes.Ok, "预览处理完成")
}
func copyFileOnDisk(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, in)
return err
} }
func executeDisconnectCommand(command string) { func executeDisconnectCommand(command string) {

View File

@@ -1,14 +1,13 @@
package util package util
import ( import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/logger"
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"math/rand" "math/rand"
"os" "os"
"os/exec" "os/exec"
@@ -18,6 +17,10 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
) )
const FfmpegExec = "ffmpeg" const FfmpegExec = "ffmpeg"
@@ -67,8 +70,10 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
var wg sync.WaitGroup var wg sync.WaitGroup
var mu sync.Mutex var mu sync.Mutex
var notOk bool var notOk bool
clonedFiles := make([]dto.File, len(task.Files))
copy(clonedFiles, task.Files)
var taskClone = dto.FfmpegTask{ var taskClone = dto.FfmpegTask{
Files: task.Files, Files: clonedFiles,
OutputFile: task.OutputFile, OutputFile: task.OutputFile,
Offset: task.Offset, Offset: task.Offset,
Length: task.Length, Length: task.Length,
@@ -79,7 +84,20 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
go func(file *dto.File) { go func(file *dto.File) {
defer wg.Done() defer wg.Done()
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts") tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
result, err := convertMp4ToTs(subCtx, *file, tmpFile) codec, err := GetVideoCodec(subCtx, file.Url)
if err != nil {
logger.Error("获取视频编码失败", zap.Error(err))
mu.Lock()
notOk = true
mu.Unlock()
return
}
var result bool
if codec == "hevc" {
result, err = convertHevcToTs(subCtx, *file, tmpFile)
} else {
result, err = convertMp4ToTs(subCtx, *file, tmpFile)
}
if err != nil { if err != nil {
logger.Error("转码出错", zap.Error(err)) logger.Error("转码出错", zap.Error(err))
mu.Lock() mu.Lock()
@@ -101,6 +119,12 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
wg.Wait() wg.Wait()
if notOk { if notOk {
// 清理已成功转换的临时文件
for i := range taskClone.Files {
if taskClone.Files[i].Url != task.Files[i].Url {
os.Remove(taskClone.Files[i].Url)
}
}
span.SetStatus(codes.Error, "FFMPEG多文件转码失败") span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
return false return false
} }
@@ -147,7 +171,7 @@ func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool {
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2") subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
defer span.End() defer span.End()
// 多文件,方法二:使用计算资源编码 // 多文件,方法二:使用计算资源编码
result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Offset, task.OutputFile) result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Length, task.OutputFile)
if err != nil { if err != nil {
return false return false
} }
@@ -183,14 +207,6 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败") span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
return false return false
} }
stat, err := os.Stat(task.OutputFile)
if err != nil {
span.SetStatus(codes.Error, "文件不存在")
logger.Error("文件不存在", zap.String("outputFile", task.OutputFile))
return false
}
span.SetAttributes(attribute.String("file.name", task.OutputFile))
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
if result { if result {
outfile, err := os.Stat(task.OutputFile) outfile, err := os.Stat(task.OutputFile)
if err != nil { if err != nil {
@@ -228,7 +244,7 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
} }
// 按照 Create 的值升序排序 // 按照 Create 的值升序排序
sort.Slice(fileList, func(i, j int) bool { sort.Slice(fileList, func(i, j int) bool {
return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix() return fileList[i].StartTime.Unix() < fileList[j].StartTime.Unix()
}) })
// 如果片段在中间断开时间过长 // 如果片段在中间断开时间过长
@@ -316,11 +332,12 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64
FfmpegExec, FfmpegExec,
"-hide_banner", "-hide_banner",
"-y", "-y",
"-i", inputFile,
"-c:v", "copy",
"-an",
"-reset_timestamps", "1",
"-ss", strconv.FormatFloat(offset, 'f', 2, 64), "-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-i", inputFile,
"-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100",
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
"-t", strconv.FormatFloat(length, 'f', 2, 64), "-t", strconv.FormatFloat(length, 'f', 2, 64),
"-fflags", "+genpts", "-fflags", "+genpts",
"-f", "mp4", "-f", "mp4",
@@ -332,7 +349,7 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) { func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut") subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
defer span.End() defer span.End()
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64()) tmpFile := path.Join(os.TempDir(), fmt.Sprintf("tmp%.10f.txt", rand.Float64()))
tmpFileObj, err := os.Create(tmpFile) tmpFileObj, err := os.Create(tmpFile)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
@@ -359,11 +376,14 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
"-y", "-y",
"-f", "concat", "-f", "concat",
"-safe", "0", "-safe", "0",
"-i", tmpFile,
"-c:v", "copy",
"-an",
"-ss", strconv.FormatFloat(offset, 'f', 2, 64), "-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-i", tmpFile,
"-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100",
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
"-t", strconv.FormatFloat(length, 'f', 2, 64), "-t", strconv.FormatFloat(length, 'f', 2, 64),
"-fflags", "+genpts",
"-f", "mp4", "-f", "mp4",
outputFile, outputFile,
} }
@@ -383,6 +403,9 @@ func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length flo
ffmpegCmd = append(ffmpegCmd, "-i", file.Url) ffmpegCmd = append(ffmpegCmd, "-i", file.Url)
} }
// 添加静音音频源作为额外输入
ffmpegCmd = append(ffmpegCmd, "-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100")
inputCount := len(inputFiles) inputCount := len(inputFiles)
filterComplex := strings.Builder{} filterComplex := strings.Builder{}
for i := 0; i < inputCount; i++ { for i := 0; i < inputCount; i++ {
@@ -393,8 +416,10 @@ func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length flo
ffmpegCmd = append(ffmpegCmd, ffmpegCmd = append(ffmpegCmd,
"-filter_complex", filterComplex.String(), "-filter_complex", filterComplex.String(),
"-map", "[v]", "-map", "[v]",
"-map", fmt.Sprintf("%d:a", inputCount),
"-c:a", "aac",
"-shortest",
"-preset:v", "fast", "-preset:v", "fast",
"-an",
"-ss", strconv.FormatFloat(offset, 'f', 2, 64), "-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-t", strconv.FormatFloat(length, 'f', 2, 64), "-t", strconv.FormatFloat(length, 'f', 2, 64),
"-f", "mp4", "-f", "mp4",
@@ -413,48 +438,174 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds()))) span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
}() }()
logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " "))) logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
cmd := exec.CommandContext(timeoutCtx, ffmpegCmd[0], ffmpegCmd[1:]...)
var stderr bytes.Buffer var stderr bytes.Buffer
cmd.Stderr = &stderr cmd.Stderr = &stderr
err := cmd.Start() err := cmd.Run()
if err != nil { if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令失败") if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) && ctx.Err() == nil {
logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err
}
defer cmd.Process.Kill()
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-time.After(1 * time.Minute):
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出") span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " "))) logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, fmt.Errorf("ffmpeg command timed out") return false, fmt.Errorf("ffmpeg command timed out")
case err := <-done: }
if err != nil { if ctx.Err() != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetStatus(codes.Error, "FFMPEG执行命令被取消")
logger.Warn("FFMPEG执行命令被取消", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, ctx.Err()
}
span.SetStatus(codes.Error, "FFMPEG执行命令失败") span.SetStatus(codes.Error, "FFMPEG执行命令失败")
logger.Error("FFMPEG执行命令失败", logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()), zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " "))) zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err return false, err
} }
endTime := time.Now()
logger.Info("FFMPEG执行命令结束", logger.Info("FFMPEG执行命令结束",
zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()), zap.Int64("durationMs", time.Since(startTime).Milliseconds()),
zap.String("command", strings.Join(ffmpegCmd, " "))) zap.String("command", strings.Join(ffmpegCmd, " ")))
return true, nil return true, nil
} }
func CompressVideo(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
subCtx, span := tracer.Start(ctx, "CompressVideo")
defer span.End()
span.SetAttributes(attribute.Int("height", height))
hwaccel := strings.ToLower(strings.TrimSpace(config.Config.Preview.HwAccel))
if hwaccel != "" && hwaccel != "none" {
span.SetAttributes(attribute.String("hwaccel", hwaccel))
ok, err := compressVideoGPU(subCtx, inputFile, outputFile, height, hwaccel)
if ok {
return true, nil
}
logger.Warn("GPU编码失败,回退到CPU编码",
zap.String("hwaccel", hwaccel),
zap.Error(err))
os.Remove(outputFile)
}
return compressVideoCPU(subCtx, inputFile, outputFile, height)
}
func compressVideoCPU(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
_, span := tracer.Start(ctx, "compressVideoCPU")
defer span.End()
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "libx264",
"-preset", "fast",
"-crf", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
return handleFfmpegProcess(ctx, ffmpegCmd)
}
func compressVideoGPU(ctx context.Context, inputFile, outputFile string, height int, hwaccel string) (bool, error) {
_, span := tracer.Start(ctx, "compressVideoGPU")
defer span.End()
span.SetAttributes(attribute.String("hwaccel", hwaccel))
var ffmpegCmd []string
switch hwaccel {
case "nvenc":
scaleFilter := fmt.Sprintf("scale_cuda=-2:%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-hwaccel", "cuda",
"-hwaccel_output_format", "cuda",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_nvenc",
"-preset", "p4",
"-cq", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
case "amf":
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_amf",
"-quality", "balanced",
"-rc", "cqp",
"-qp_i", "28",
"-qp_p", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
case "qsv":
scaleFilter := fmt.Sprintf("vpp_qsv=w=-1:h=%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-hwaccel", "qsv",
"-hwaccel_output_format", "qsv",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_qsv",
"-preset", "faster",
"-global_quality", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
default:
return false, fmt.Errorf("不支持的硬件加速类型: %s", hwaccel)
}
return handleFfmpegProcess(ctx, ffmpegCmd)
}
func GetVideoCodec(ctx context.Context, filePath string) (string, error) {
_, span := tracer.Start(ctx, "GetVideoCodec")
defer span.End()
ffprobeCmd := []string{
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
filePath,
}
span.SetAttributes(attribute.String("ffprobe.cmd", ToJson(ffprobeCmd)))
cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "获取视频编码失败")
return "", fmt.Errorf("failed to get video codec: %w", err)
}
codec := strings.TrimSpace(out.String())
span.SetAttributes(attribute.String("video.codec", codec))
return codec, nil
} }
func GetVideoDuration(ctx context.Context, filePath string) (float64, error) { func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {