Compare commits

..

5 Commits

Author SHA1 Message Date
4067ab8d0a hwaccel qsvL use vpp 2026-03-13 11:38:18 +08:00
f1634fcecc hwaccel qsv 2026-03-13 11:29:18 +08:00
fe09c60822 hwaccel 2026-03-13 11:10:39 +08:00
83bfe34394 config p 2026-03-13 00:06:39 +08:00
5972ba1948 common p 2026-03-13 00:04:04 +08:00
6 changed files with 160 additions and 44 deletions

View File

@@ -76,14 +76,14 @@ func OssUpload(ctx context.Context, url, filePath string) error {
return nil return nil
} }
func UploadPreviewFile(ctx context.Context, taskId string, previewFilePath string) error { func UploadPreviewFile(ctx context.Context, taskId string, previewFilePath string, resolution string) error {
subCtx, span := tracer.Start(ctx, "UploadPreviewFile") subCtx, span := tracer.Start(ctx, "UploadPreviewFile")
defer span.End() defer span.End()
url, err := QueryPreviewUploadUrl(subCtx, taskId) url, err := QueryPreviewUploadUrl(subCtx, taskId, resolution)
if err != nil { if err != nil {
return err return err
} }
logger.Info("开始上传预览文件", zap.String("url", url)) logger.Info("开始上传预览文件", zap.String("url", url), zap.String("resolution", resolution))
if err := OssUpload(subCtx, url, previewFilePath); err != nil { if err := OssUpload(subCtx, url, previewFilePath); err != nil {
return err return err
} }

View File

@@ -127,10 +127,10 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
} }
} }
func QueryPreviewUploadUrl(ctx context.Context, taskId string) (string, error) { func QueryPreviewUploadUrl(ctx context.Context, taskId string, resolution string) (string, error) {
_, span := tracer.Start(ctx, "QueryPreviewUploadUrl") _, span := tracer.Start(ctx, "QueryPreviewUploadUrl")
defer span.End() defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewUploadUrl" url := config.Config.Api.BaseUrl + "/" + taskId + "/previewUploadUrl?resolution=" + resolution
span.SetAttributes(attribute.String("http.url", url)) span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "GET")) span.SetAttributes(attribute.String("http.method", "GET"))
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
@@ -160,10 +160,10 @@ func QueryPreviewUploadUrl(ctx context.Context, taskId string) (string, error) {
return string(body), nil return string(body), nil
} }
func ReportPreviewSuccess(ctx context.Context, taskId string) bool { func ReportPreviewSuccess(ctx context.Context, taskId string, resolution string) bool {
_, span := tracer.Start(ctx, "ReportPreviewSuccess") _, span := tracer.Start(ctx, "ReportPreviewSuccess")
defer span.End() defer span.End()
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewSuccess" url := config.Config.Api.BaseUrl + "/" + taskId + "/previewSuccess?resolution=" + resolution
span.SetAttributes(attribute.String("http.url", url)) span.SetAttributes(attribute.String("http.url", url))
span.SetAttributes(attribute.String("http.method", "POST")) span.SetAttributes(attribute.String("http.method", "POST"))

View File

@@ -32,3 +32,9 @@ disconnectAction:
enabled: false enabled: false
thresholdMinutes: 5 thresholdMinutes: 5
command: "" command: ""
preview:
enabled: true
hwaccel: ""
resolutions:
- 720
- 1080

View File

@@ -64,6 +64,12 @@ type DisconnectActionConfig struct {
Command string `mapstructure:"command"` Command string `mapstructure:"command"`
} }
type PreviewConfig struct {
Enabled bool `mapstructure:"enabled"`
Resolutions []int `mapstructure:"resolutions"`
HwAccel string `mapstructure:"hwaccel"`
}
type MainConfig struct { type MainConfig struct {
Api ApiConfig `mapstructure:"api"` Api ApiConfig `mapstructure:"api"`
Record RecordConfig `mapstructure:"record"` Record RecordConfig `mapstructure:"record"`
@@ -71,4 +77,5 @@ type MainConfig struct {
FileName FileNameConfig `mapstructure:"fileName"` FileName FileNameConfig `mapstructure:"fileName"`
Viid ViidConfig `mapstructure:"viid"` Viid ViidConfig `mapstructure:"viid"`
DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"` DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"`
Preview PreviewConfig `mapstructure:"preview"`
} }

78
main.go
View File

@@ -53,12 +53,16 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("deviceNo", task.DeviceNo)) zap.String("deviceNo", task.DeviceNo))
// 复制主文件用于异步生成预览,避免阻塞主上传流程 // 复制主文件用于异步生成预览,避免阻塞主上传流程
previewEnabled := config.Config.Preview.Enabled && len(config.Config.Preview.Resolutions) > 0
copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4" copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4"
copyErr := copyFileOnDisk(fo.URL, copyFile) var copyErr error
if copyErr != nil { if previewEnabled {
logger.Warn("复制文件用于预览失败", copyErr = copyFileOnDisk(fo.URL, copyFile)
zap.String("taskID", task.TaskID), if copyErr != nil {
zap.Error(copyErr)) logger.Warn("复制文件用于预览失败",
zap.String("taskID", task.TaskID),
zap.Error(copyErr))
}
} }
err = api.UploadTaskFile(ctx, task, *fo) err = api.UploadTaskFile(ctx, task, *fo)
@@ -69,7 +73,7 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("deviceNo", task.DeviceNo), zap.String("deviceNo", task.DeviceNo),
zap.Error(err)) zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID) api.ReportTaskFailure(ctx, task.TaskID)
if copyErr == nil { if previewEnabled && copyErr == nil {
os.Remove(copyFile) os.Remove(copyFile)
} }
return return
@@ -81,7 +85,7 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo), zap.String("deviceNo", task.DeviceNo),
zap.Error(err)) zap.Error(err))
if copyErr == nil { if previewEnabled && copyErr == nil {
os.Remove(copyFile) os.Remove(copyFile)
} }
return return
@@ -91,8 +95,8 @@ func startTask(device config.DeviceMapping, task dto.Task) {
zap.String("taskID", task.TaskID), zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo)) zap.String("deviceNo", task.DeviceNo))
// 异步:从副本压缩720p预览 → 上传 → 上报 // 异步:从副本压缩预览 → 上传 → 上报
if copyErr == nil { if previewEnabled && copyErr == nil {
go uploadPreview(task.TaskID, copyFile) go uploadPreview(task.TaskID, copyFile)
} }
} }
@@ -102,32 +106,40 @@ func uploadPreview(taskID string, copyFile string) {
defer span.End() defer span.End()
defer os.Remove(copyFile) defer os.Remove(copyFile)
previewFile := strings.TrimSuffix(copyFile, "_copy.mp4") + "_preview.mp4" baseName := strings.TrimSuffix(copyFile, "_copy.mp4")
ok, _ := util.CompressTo720p(ctx, copyFile, previewFile)
if !ok { for _, height := range config.Config.Preview.Resolutions {
span.SetStatus(codes.Error, "生成预览文件失败") resolution := fmt.Sprintf("%dp", height)
logger.Error("生成预览文件失败", zap.String("taskID", taskID)) previewFile := fmt.Sprintf("%s_preview_%s.mp4", baseName, resolution)
return ok, _ := util.CompressVideo(ctx, copyFile, previewFile, height)
if !ok {
logger.Error("生成预览文件失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
continue
}
err := api.UploadPreviewFile(ctx, taskID, previewFile, resolution)
if err != nil {
logger.Error("上传预览文件失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution),
zap.Error(err))
os.Remove(previewFile)
continue
}
if !api.ReportPreviewSuccess(ctx, taskID, resolution) {
logger.Error("上报预览成功失败",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
continue
}
logger.Info("预览上传成功",
zap.String("taskID", taskID),
zap.String("resolution", resolution))
} }
err := api.UploadPreviewFile(ctx, taskID, previewFile) span.SetStatus(codes.Ok, "预览处理完成")
if err != nil {
span.SetStatus(codes.Error, "上传预览文件失败")
logger.Error("上传预览文件失败",
zap.String("taskID", taskID),
zap.Error(err))
os.Remove(previewFile)
return
}
result := api.ReportPreviewSuccess(ctx, taskID)
if !result {
span.SetStatus(codes.Error, "上报预览成功失败")
logger.Error("上报预览成功失败",
zap.String("taskID", taskID))
return
}
span.SetStatus(codes.Ok, "预览上传成功")
logger.Info("预览上传成功", zap.String("taskID", taskID))
} }
func copyFileOnDisk(src, dst string) error { func copyFileOnDisk(src, dst string) error {

View File

@@ -1,6 +1,7 @@
package util package util
import ( import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/logger"
"bytes" "bytes"
@@ -470,15 +471,37 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
return true, nil return true, nil
} }
func CompressTo720p(ctx context.Context, inputFile, outputFile string) (bool, error) { func CompressVideo(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
subCtx, span := tracer.Start(ctx, "CompressTo720p") subCtx, span := tracer.Start(ctx, "CompressVideo")
defer span.End() defer span.End()
span.SetAttributes(attribute.Int("height", height))
hwaccel := strings.ToLower(strings.TrimSpace(config.Config.Preview.HwAccel))
if hwaccel != "" && hwaccel != "none" {
span.SetAttributes(attribute.String("hwaccel", hwaccel))
ok, err := compressVideoGPU(subCtx, inputFile, outputFile, height, hwaccel)
if ok {
return true, nil
}
logger.Warn("GPU编码失败,回退到CPU编码",
zap.String("hwaccel", hwaccel),
zap.Error(err))
os.Remove(outputFile)
}
return compressVideoCPU(subCtx, inputFile, outputFile, height)
}
func compressVideoCPU(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
_, span := tracer.Start(ctx, "compressVideoCPU")
defer span.End()
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
ffmpegCmd := []string{ ffmpegCmd := []string{
FfmpegExec, FfmpegExec,
"-hide_banner", "-hide_banner",
"-y", "-y",
"-i", inputFile, "-i", inputFile,
"-vf", "scale=-2:720", "-vf", scaleFilter,
"-c:v", "libx264", "-c:v", "libx264",
"-preset", "fast", "-preset", "fast",
"-crf", "28", "-crf", "28",
@@ -487,7 +510,75 @@ func CompressTo720p(ctx context.Context, inputFile, outputFile string) (bool, er
"-f", "mp4", "-f", "mp4",
outputFile, outputFile,
} }
return handleFfmpegProcess(subCtx, ffmpegCmd) return handleFfmpegProcess(ctx, ffmpegCmd)
}
func compressVideoGPU(ctx context.Context, inputFile, outputFile string, height int, hwaccel string) (bool, error) {
_, span := tracer.Start(ctx, "compressVideoGPU")
defer span.End()
span.SetAttributes(attribute.String("hwaccel", hwaccel))
var ffmpegCmd []string
switch hwaccel {
case "nvenc":
scaleFilter := fmt.Sprintf("scale_cuda=-2:%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-hwaccel", "cuda",
"-hwaccel_output_format", "cuda",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_nvenc",
"-preset", "p4",
"-cq", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
case "amf":
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_amf",
"-quality", "balanced",
"-rc", "cqp",
"-qp_i", "28",
"-qp_p", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
case "qsv":
scaleFilter := fmt.Sprintf("vpp_qsv=w=-1:h=%d", height)
ffmpegCmd = []string{
FfmpegExec,
"-hide_banner",
"-y",
"-hwaccel", "qsv",
"-hwaccel_output_format", "qsv",
"-i", inputFile,
"-vf", scaleFilter,
"-c:v", "h264_qsv",
"-preset", "faster",
"-global_quality", "28",
"-c:a", "aac",
"-b:a", "128k",
"-f", "mp4",
outputFile,
}
default:
return false, fmt.Errorf("不支持的硬件加速类型: %s", hwaccel)
}
return handleFfmpegProcess(ctx, ffmpegCmd)
} }
func GetVideoCodec(ctx context.Context, filePath string) (string, error) { func GetVideoCodec(ctx context.Context, filePath string) (string, error) {