Compare commits

...

9 Commits

Author SHA1 Message Date
4067ab8d0a hwaccel qsvL use vpp 2026-03-13 11:38:18 +08:00
f1634fcecc hwaccel qsv 2026-03-13 11:29:18 +08:00
fe09c60822 hwaccel 2026-03-13 11:10:39 +08:00
83bfe34394 config p 2026-03-13 00:06:39 +08:00
5972ba1948 common p 2026-03-13 00:04:04 +08:00
030d8e9e6d 720p 2026-03-12 15:56:06 +08:00
f8f0e92723 id string 2026-03-10 15:49:03 +08:00
181a6b5368 fix(video): 修复视频剪辑时间戳处理问题
- 调整 -ss 参数位置以确保正确的剪辑起始时间
- 移除多余的 -reset_timestamps 参数避免时间戳重置问题
- 确保 -fflags +genpts 参数在正确的命令序列中设置
- 修复拼接操作中的时间偏移参数顺序
- 优化视频处理流程中的时间戳生成逻辑
2026-03-06 17:31:12 +08:00
a951517cfd refactor(ffmpeg): 优化FFmpeg视频处理功能并增强错误处理
- 移除多余的导入包并将errors包添加到顶部导入列表
- 修复包导入顺序,将OpenTelemetry相关包移到底部
- 在任务克隆时深度复制文件列表以避免并发修改问题
- 添加临时文件清理逻辑,转码失败时删除已转换的临时文件
- 移除冗余的文件状态检查代码
- 修复排序比较函数中的相等条件,改为严格小于
- 将临时文件路径设置为系统临时目录下
- 使用context超时控制FFmpeg命令执行,替代手动超时管理
- 优化FFmpeg命令执行流程,使用Run方法统一处理进程启动和等待
- 添加更精确的超时和取消错误处理逻辑
2026-02-27 18:40:30 +08:00
7 changed files with 327 additions and 46 deletions

View File

@@ -75,3 +75,17 @@ func OssUpload(ctx context.Context, url, filePath string) error {
span.SetStatus(codes.Ok, "上传成功")
return nil
}
func UploadPreviewFile(ctx context.Context, taskId string, previewFilePath string, resolution string) error {
subCtx, span := tracer.Start(ctx, "UploadPreviewFile")
defer span.End()
url, err := QueryPreviewUploadUrl(subCtx, taskId, resolution)
if err != nil {
return err
}
logger.Info("开始上传预览文件", zap.String("url", url), zap.String("resolution", resolution))
if err := OssUpload(subCtx, url, previewFilePath); err != nil {
return err
}
return nil
}

View File

@@ -126,3 +126,71 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
return false
}
}
func QueryPreviewUploadUrl(ctx context.Context, taskId string, resolution string) (string, error) {
_, span := tracer.Start(ctx, "QueryPreviewUploadUrl")
defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewUploadUrl?resolution=" + resolution
span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "GET"))
req, err := http.NewRequest("GET", url, nil)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
logger.Error("创建请求失败", zap.Error(err))
return "", err
}
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
logger.Error("发送请求失败", zap.Error(err))
return "", err
}
span.SetAttributes(attribute.String("http.status", resp.Status))
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "读取响应体失败")
logger.Error("读取响应体失败", zap.Error(err))
return "", err
}
return string(body), nil
}
func ReportPreviewSuccess(ctx context.Context, taskId string, resolution string) bool {
_, span := tracer.Start(ctx, "ReportPreviewSuccess")
defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewSuccess?resolution=" + resolution
span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "POST"))
req, err := http.NewRequest("POST", url, nil)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
logger.Error("创建请求失败", zap.Error(err))
return false
}
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
logger.Error("发送请求失败", zap.Error(err))
return false
}
defer resp.Body.Close()
span.SetAttributes(attribute.String("http.status", resp.Status))
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
if resp.StatusCode == 200 {
span.SetStatus(codes.Ok, "成功")
return true
} else {
span.SetStatus(codes.Error, "失败")
return false
}
}

View File

@@ -121,7 +121,7 @@ func (c *ImageProcessingConfig) GetEffectiveConfig() *ImageProcessingConfig {
}
type UploadConfig struct {
TaskID int64 `json:"taskId"`
TaskID int64 `json:"taskId,string"`
FaceUploadURL string `json:"faceUploadUrl"`
ThumbnailUploadURL string `json:"thumbnailUploadUrl"`
SourceUploadURL string `json:"sourceUploadUrl"`
@@ -147,12 +147,12 @@ type FacePositionInfo struct {
}
type SubmitResultRequest struct {
TaskID int64 `json:"taskId"`
TaskID int64 `json:"taskId,string"`
FacePosition FacePositionInfo `json:"facePosition"`
}
type SubmitFailureRequest struct {
TaskID int64 `json:"taskId"`
TaskID int64 `json:"taskId,string"`
ErrorCode string `json:"errorCode"`
ErrorMessage string `json:"errorMessage"`
}

View File

@@ -32,3 +32,9 @@ disconnectAction:
enabled: false
thresholdMinutes: 5
command: ""
preview:
enabled: true
hwaccel: ""
resolutions:
- 720
- 1080

View File

@@ -64,6 +64,12 @@ type DisconnectActionConfig struct {
Command string `mapstructure:"command"`
}
type PreviewConfig struct {
Enabled bool `mapstructure:"enabled"`
Resolutions []int `mapstructure:"resolutions"`
HwAccel string `mapstructure:"hwaccel"`
}
type MainConfig struct {
Api ApiConfig `mapstructure:"api"`
Record RecordConfig `mapstructure:"record"`
@@ -71,4 +77,5 @@ type MainConfig struct {
FileName FileNameConfig `mapstructure:"fileName"`
Viid ViidConfig `mapstructure:"viid"`
DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"`
Preview PreviewConfig `mapstructure:"preview"`
}

84
main.go
View File

@@ -7,13 +7,16 @@ import (
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/telemetry"
"ZhenTuLocalPassiveAdapter/util"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
@@ -48,6 +51,20 @@ func startTask(device config.DeviceMapping, task dto.Task) {
logger.Info("处理任务成功",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo))
// 复制主文件用于异步生成预览,避免阻塞主上传流程
previewEnabled := config.Config.Preview.Enabled && len(config.Config.Preview.Resolutions) > 0
copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4"
var copyErr error
if previewEnabled {
copyErr = copyFileOnDisk(fo.URL, copyFile)
if copyErr != nil {
logger.Warn("复制文件用于预览失败",
zap.String("taskID", task.TaskID),
zap.Error(copyErr))
}
}
err = api.UploadTaskFile(ctx, task, *fo)
if err != nil {
span.SetStatus(codes.Error, "上传文件失败")
@@ -56,6 +73,9 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID)
if previewEnabled && copyErr == nil {
os.Remove(copyFile)
}
return
}
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
@@ -65,12 +85,76 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
if previewEnabled && copyErr == nil {
os.Remove(copyFile)
}
return
}
span.SetStatus(codes.Ok, "上传文件成功")
logger.Info("上传文件成功",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo))
// 异步:从副本压缩预览 → 上传 → 上报
if previewEnabled && copyErr == nil {
go uploadPreview(task.TaskID, copyFile)
}
}
func uploadPreview(taskID string, copyFile string) {
ctx, span := tracer.Start(context.Background(), "uploadPreview")
defer span.End()
defer os.Remove(copyFile)
baseName := strings.TrimSuffix(copyFile, "_copy.mp4")
for _, height := range config.Config.Preview.Resolutions {
resolution := fmt.Sprintf("%dp", height)
previewFile := fmt.Sprintf("%s_preview_%s.mp4", baseName, resolution)
ok, _ := util.CompressVideo(ctx, copyFile, previewFile, height)
if !ok {
logger.Error("生成预览文件失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
continue
}
err := api.UploadPreviewFile(ctx, taskID, previewFile, resolution)
if err != nil {
logger.Error("上传预览文件失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution),
zap.Error(err))
os.Remove(previewFile)
continue
}
if !api.ReportPreviewSuccess(ctx, taskID, resolution) {
logger.Error("上报预览成功失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
continue
}
logger.Info("预览上传成功",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
}
span.SetStatus(codes.Ok, "预览处理完成")
}
func copyFileOnDisk(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, in)
return err
}
func executeDisconnectCommand(command string) {

View File

@@ -1,14 +1,13 @@
package util
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"math/rand"
"os"
"os/exec"
@@ -18,6 +17,10 @@ import (
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)
const FfmpegExec = "ffmpeg"
@@ -67,8 +70,10 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
var wg sync.WaitGroup
var mu sync.Mutex
var notOk bool
clonedFiles := make([]dto.File, len(task.Files))
copy(clonedFiles, task.Files)
var taskClone = dto.FfmpegTask{
Files: task.Files,
Files: clonedFiles,
OutputFile: task.OutputFile,
Offset: task.Offset,
Length: task.Length,
@@ -114,6 +119,12 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
wg.Wait()
if notOk {
// 清理已成功转换的临时文件
for i := range taskClone.Files {
if taskClone.Files[i].Url != task.Files[i].Url {
os.Remove(taskClone.Files[i].Url)
}
}
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
return false
}
@@ -196,14 +207,6 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
return false
}
stat, err := os.Stat(task.OutputFile)
if err != nil {
span.SetStatus(codes.Error, "文件不存在")
logger.Error("文件不存在", zap.String("outputFile", task.OutputFile))
return false
}
span.SetAttributes(attribute.String("file.name", task.OutputFile))
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
if result {
outfile, err := os.Stat(task.OutputFile)
if err != nil {
@@ -241,7 +244,7 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
}
// 按照 Create 的值升序排序
sort.Slice(fileList, func(i, j int) bool {
return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix()
return fileList[i].StartTime.Unix() < fileList[j].StartTime.Unix()
})
// 如果片段在中间断开时间过长
@@ -329,13 +332,12 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64
FfmpegExec,
"-hide_banner",
"-y",
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-i", inputFile,
"-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100",
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
"-reset_timestamps", "1",
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-t", strconv.FormatFloat(length, 'f', 2, 64),
"-fflags", "+genpts",
"-f", "mp4",
@@ -347,7 +349,7 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
defer span.End()
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
tmpFile := path.Join(os.TempDir(), fmt.Sprintf("tmp%.10f.txt", rand.Float64()))
tmpFileObj, err := os.Create(tmpFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
@@ -374,13 +376,14 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
"-y",
"-f", "concat",
"-safe", "0",
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-i", tmpFile,
"-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100",
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-t", strconv.FormatFloat(length, 'f', 2, 64),
"-fflags", "+genpts",
"-f", "mp4",
outputFile,
}
@@ -435,48 +438,147 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
}()
logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
cmd := exec.CommandContext(timeoutCtx, ffmpegCmd[0], ffmpegCmd[1:]...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
err := cmd.Start()
err := cmd.Run()
if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) && ctx.Err() == nil {
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, fmt.Errorf("ffmpeg command timed out")
}
if ctx.Err() != nil {
span.SetStatus(codes.Error, "FFMPEG执行命令被取消")
logger.Warn("FFMPEG执行命令被取消", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, ctx.Err()
}
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err
}
defer cmd.Process.Kill()
logger.Info("FFMPEG执行命令结束",
zap.Int64("durationMs", time.Since(startTime).Milliseconds()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return true, nil
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
func CompressVideo(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
subCtx, span := tracer.Start(ctx, "CompressVideo")
defer span.End()
span.SetAttributes(attribute.Int("height", height))
select {
case <-time.After(1 * time.Minute):
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, fmt.Errorf("ffmpeg command timed out")
case err := <-done:
if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err
hwaccel := strings.ToLower(strings.TrimSpace(config.Config.Preview.HwAccel))
if hwaccel != "" && hwaccel != "none" {
span.SetAttributes(attribute.String("hwaccel", hwaccel))
ok, err := compressVideoGPU(subCtx, inputFile, outputFile, height, hwaccel)
if ok {
return true, nil
}
endTime := time.Now()
logger.Info("FFMPEG执行命令结束",
zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return true, nil
logger.Warn("GPU编码失败,回退到CPU编码",
zap.String("hwaccel", hwaccel),
zap.Error(err))
os.Remove(outputFile)
}
return compressVideoCPU(subCtx, inputFile, outputFile, height)
}
func compressVideoCPU(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
_, span := tracer.Start(ctx, "compressVideoCPU")
defer span.End()
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
ffmpegCmd := []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "libx264",
"-preset", "fast",
"-crf", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
return handleFfmpegProcess(ctx, ffmpegCmd)
}
func compressVideoGPU(ctx context.Context, inputFile, outputFile string, height int, hwaccel string) (bool, error) {
_, span := tracer.Start(ctx, "compressVideoGPU")
defer span.End()
span.SetAttributes(attribute.String("hwaccel", hwaccel))
var ffmpegCmd []string
switch hwaccel {
case "nvenc":
scaleFilter := fmt.Sprintf("scale_cuda=-2:%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-hwaccel", "cuda",
"-hwaccel_output_format", "cuda",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_nvenc",
"-preset", "p4",
"-cq", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
case "amf":
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_amf",
"-quality", "balanced",
"-rc", "cqp",
"-qp_i", "28",
"-qp_p", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
case "qsv":
scaleFilter := fmt.Sprintf("vpp_qsv=w=-1:h=%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-hwaccel", "qsv",
"-hwaccel_output_format", "qsv",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_qsv",
"-preset", "faster",
"-global_quality", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
default:
return false, fmt.Errorf("不支持的硬件加速类型: %s", hwaccel)
}
return handleFfmpegProcess(ctx, ffmpegCmd)
}
func GetVideoCodec(ctx context.Context, filePath string) (string, error) {