You've already forked VptPassiveAdapter
Compare commits
11 Commits
5722dd8e5a
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 4067ab8d0a | |||
| f1634fcecc | |||
| fe09c60822 | |||
| 83bfe34394 | |||
| 5972ba1948 | |||
| 030d8e9e6d | |||
| f8f0e92723 | |||
| 181a6b5368 | |||
| a951517cfd | |||
| 1936c1a73a | |||
| 72b8d277ea |
@@ -75,3 +75,17 @@ func OssUpload(ctx context.Context, url, filePath string) error {
|
|||||||
span.SetStatus(codes.Ok, "上传成功")
|
span.SetStatus(codes.Ok, "上传成功")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UploadPreviewFile(ctx context.Context, taskId string, previewFilePath string, resolution string) error {
|
||||||
|
subCtx, span := tracer.Start(ctx, "UploadPreviewFile")
|
||||||
|
defer span.End()
|
||||||
|
url, err := QueryPreviewUploadUrl(subCtx, taskId, resolution)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logger.Info("开始上传预览文件", zap.String("url", url), zap.String("resolution", resolution))
|
||||||
|
if err := OssUpload(subCtx, url, previewFilePath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -126,3 +126,71 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func QueryPreviewUploadUrl(ctx context.Context, taskId string, resolution string) (string, error) {
|
||||||
|
_, span := tracer.Start(ctx, "QueryPreviewUploadUrl")
|
||||||
|
defer span.End()
|
||||||
|
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewUploadUrl?resolution=" + resolution
|
||||||
|
span.SetAttributes(attribute.String("http.url", url))
|
||||||
|
span.SetAttributes(attribute.String("http.method", "GET"))
|
||||||
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建请求失败")
|
||||||
|
logger.Error("创建请求失败", zap.Error(err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
resp, err := GetAPIClient().Do(req)
|
||||||
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "发送请求失败")
|
||||||
|
logger.Error("发送请求失败", zap.Error(err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
span.SetAttributes(attribute.String("http.status", resp.Status))
|
||||||
|
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
|
||||||
|
defer resp.Body.Close()
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "读取响应体失败")
|
||||||
|
logger.Error("读取响应体失败", zap.Error(err))
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(body), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ReportPreviewSuccess(ctx context.Context, taskId string, resolution string) bool {
|
||||||
|
_, span := tracer.Start(ctx, "ReportPreviewSuccess")
|
||||||
|
defer span.End()
|
||||||
|
url := config.Config.Api.BaseUrl + "/" + taskId + "/previewSuccess?resolution=" + resolution
|
||||||
|
span.SetAttributes(attribute.String("http.url", url))
|
||||||
|
span.SetAttributes(attribute.String("http.method", "POST"))
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建请求失败")
|
||||||
|
logger.Error("创建请求失败", zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := GetAPIClient().Do(req)
|
||||||
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "发送请求失败")
|
||||||
|
logger.Error("发送请求失败", zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.String("http.status", resp.Status))
|
||||||
|
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
|
||||||
|
if resp.StatusCode == 200 {
|
||||||
|
span.SetStatus(codes.Ok, "成功")
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
span.SetStatus(codes.Error, "失败")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ func (c *ImageProcessingConfig) GetEffectiveConfig() *ImageProcessingConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type UploadConfig struct {
|
type UploadConfig struct {
|
||||||
TaskID int64 `json:"taskId"`
|
TaskID int64 `json:"taskId,string"`
|
||||||
FaceUploadURL string `json:"faceUploadUrl"`
|
FaceUploadURL string `json:"faceUploadUrl"`
|
||||||
ThumbnailUploadURL string `json:"thumbnailUploadUrl"`
|
ThumbnailUploadURL string `json:"thumbnailUploadUrl"`
|
||||||
SourceUploadURL string `json:"sourceUploadUrl"`
|
SourceUploadURL string `json:"sourceUploadUrl"`
|
||||||
@@ -147,12 +147,12 @@ type FacePositionInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SubmitResultRequest struct {
|
type SubmitResultRequest struct {
|
||||||
TaskID int64 `json:"taskId"`
|
TaskID int64 `json:"taskId,string"`
|
||||||
FacePosition FacePositionInfo `json:"facePosition"`
|
FacePosition FacePositionInfo `json:"facePosition"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SubmitFailureRequest struct {
|
type SubmitFailureRequest struct {
|
||||||
TaskID int64 `json:"taskId"`
|
TaskID int64 `json:"taskId,string"`
|
||||||
ErrorCode string `json:"errorCode"`
|
ErrorCode string `json:"errorCode"`
|
||||||
ErrorMessage string `json:"errorMessage"`
|
ErrorMessage string `json:"errorMessage"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,3 +32,9 @@ disconnectAction:
|
|||||||
enabled: false
|
enabled: false
|
||||||
thresholdMinutes: 5
|
thresholdMinutes: 5
|
||||||
command: ""
|
command: ""
|
||||||
|
preview:
|
||||||
|
enabled: true
|
||||||
|
hwaccel: ""
|
||||||
|
resolutions:
|
||||||
|
- 720
|
||||||
|
- 1080
|
||||||
|
|||||||
@@ -64,6 +64,12 @@ type DisconnectActionConfig struct {
|
|||||||
Command string `mapstructure:"command"`
|
Command string `mapstructure:"command"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PreviewConfig struct {
|
||||||
|
Enabled bool `mapstructure:"enabled"`
|
||||||
|
Resolutions []int `mapstructure:"resolutions"`
|
||||||
|
HwAccel string `mapstructure:"hwaccel"`
|
||||||
|
}
|
||||||
|
|
||||||
type MainConfig struct {
|
type MainConfig struct {
|
||||||
Api ApiConfig `mapstructure:"api"`
|
Api ApiConfig `mapstructure:"api"`
|
||||||
Record RecordConfig `mapstructure:"record"`
|
Record RecordConfig `mapstructure:"record"`
|
||||||
@@ -71,4 +77,5 @@ type MainConfig struct {
|
|||||||
FileName FileNameConfig `mapstructure:"fileName"`
|
FileName FileNameConfig `mapstructure:"fileName"`
|
||||||
Viid ViidConfig `mapstructure:"viid"`
|
Viid ViidConfig `mapstructure:"viid"`
|
||||||
DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"`
|
DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"`
|
||||||
|
Preview PreviewConfig `mapstructure:"preview"`
|
||||||
}
|
}
|
||||||
|
|||||||
84
main.go
84
main.go
@@ -7,13 +7,16 @@ import (
|
|||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
"ZhenTuLocalPassiveAdapter/logger"
|
"ZhenTuLocalPassiveAdapter/logger"
|
||||||
"ZhenTuLocalPassiveAdapter/telemetry"
|
"ZhenTuLocalPassiveAdapter/telemetry"
|
||||||
|
"ZhenTuLocalPassiveAdapter/util"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -48,6 +51,20 @@ func startTask(device config.DeviceMapping, task dto.Task) {
|
|||||||
logger.Info("处理任务成功",
|
logger.Info("处理任务成功",
|
||||||
zap.String("taskID", task.TaskID),
|
zap.String("taskID", task.TaskID),
|
||||||
zap.String("deviceNo", task.DeviceNo))
|
zap.String("deviceNo", task.DeviceNo))
|
||||||
|
|
||||||
|
// 复制主文件用于异步生成预览,避免阻塞主上传流程
|
||||||
|
previewEnabled := config.Config.Preview.Enabled && len(config.Config.Preview.Resolutions) > 0
|
||||||
|
copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4"
|
||||||
|
var copyErr error
|
||||||
|
if previewEnabled {
|
||||||
|
copyErr = copyFileOnDisk(fo.URL, copyFile)
|
||||||
|
if copyErr != nil {
|
||||||
|
logger.Warn("复制文件用于预览失败",
|
||||||
|
zap.String("taskID", task.TaskID),
|
||||||
|
zap.Error(copyErr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = api.UploadTaskFile(ctx, task, *fo)
|
err = api.UploadTaskFile(ctx, task, *fo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.SetStatus(codes.Error, "上传文件失败")
|
span.SetStatus(codes.Error, "上传文件失败")
|
||||||
@@ -56,6 +73,9 @@ func startTask(device config.DeviceMapping, task dto.Task) {
|
|||||||
zap.String("deviceNo", task.DeviceNo),
|
zap.String("deviceNo", task.DeviceNo),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
api.ReportTaskFailure(ctx, task.TaskID)
|
api.ReportTaskFailure(ctx, task.TaskID)
|
||||||
|
if previewEnabled && copyErr == nil {
|
||||||
|
os.Remove(copyFile)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
|
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
|
||||||
@@ -65,12 +85,76 @@ func startTask(device config.DeviceMapping, task dto.Task) {
|
|||||||
zap.String("taskID", task.TaskID),
|
zap.String("taskID", task.TaskID),
|
||||||
zap.String("deviceNo", task.DeviceNo),
|
zap.String("deviceNo", task.DeviceNo),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
if previewEnabled && copyErr == nil {
|
||||||
|
os.Remove(copyFile)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
span.SetStatus(codes.Ok, "上传文件成功")
|
span.SetStatus(codes.Ok, "上传文件成功")
|
||||||
logger.Info("上传文件成功",
|
logger.Info("上传文件成功",
|
||||||
zap.String("taskID", task.TaskID),
|
zap.String("taskID", task.TaskID),
|
||||||
zap.String("deviceNo", task.DeviceNo))
|
zap.String("deviceNo", task.DeviceNo))
|
||||||
|
|
||||||
|
// 异步:从副本压缩预览 → 上传 → 上报
|
||||||
|
if previewEnabled && copyErr == nil {
|
||||||
|
go uploadPreview(task.TaskID, copyFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func uploadPreview(taskID string, copyFile string) {
|
||||||
|
ctx, span := tracer.Start(context.Background(), "uploadPreview")
|
||||||
|
defer span.End()
|
||||||
|
defer os.Remove(copyFile)
|
||||||
|
|
||||||
|
baseName := strings.TrimSuffix(copyFile, "_copy.mp4")
|
||||||
|
|
||||||
|
for _, height := range config.Config.Preview.Resolutions {
|
||||||
|
resolution := fmt.Sprintf("%dp", height)
|
||||||
|
previewFile := fmt.Sprintf("%s_preview_%s.mp4", baseName, resolution)
|
||||||
|
ok, _ := util.CompressVideo(ctx, copyFile, previewFile, height)
|
||||||
|
if !ok {
|
||||||
|
logger.Error("生成预览文件失败",
|
||||||
|
zap.String("taskID", taskID),
|
||||||
|
zap.String("resolution", resolution))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := api.UploadPreviewFile(ctx, taskID, previewFile, resolution)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("上传预览文件失败",
|
||||||
|
zap.String("taskID", taskID),
|
||||||
|
zap.String("resolution", resolution),
|
||||||
|
zap.Error(err))
|
||||||
|
os.Remove(previewFile)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !api.ReportPreviewSuccess(ctx, taskID, resolution) {
|
||||||
|
logger.Error("上报预览成功失败",
|
||||||
|
zap.String("taskID", taskID),
|
||||||
|
zap.String("resolution", resolution))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
logger.Info("预览上传成功",
|
||||||
|
zap.String("taskID", taskID),
|
||||||
|
zap.String("resolution", resolution))
|
||||||
|
}
|
||||||
|
|
||||||
|
span.SetStatus(codes.Ok, "预览处理完成")
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyFileOnDisk(src, dst string) error {
|
||||||
|
in, err := os.Open(src)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer in.Close()
|
||||||
|
out, err := os.Create(dst)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer out.Close()
|
||||||
|
_, err = io.Copy(out, in)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func executeDisconnectCommand(command string) {
|
func executeDisconnectCommand(command string) {
|
||||||
|
|||||||
251
util/ffmpeg.go
251
util/ffmpeg.go
@@ -1,14 +1,13 @@
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"ZhenTuLocalPassiveAdapter/config"
|
||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
"ZhenTuLocalPassiveAdapter/logger"
|
"ZhenTuLocalPassiveAdapter/logger"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
"go.opentelemetry.io/otel/codes"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
@@ -18,6 +17,10 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const FfmpegExec = "ffmpeg"
|
const FfmpegExec = "ffmpeg"
|
||||||
@@ -67,8 +70,10 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var notOk bool
|
var notOk bool
|
||||||
|
clonedFiles := make([]dto.File, len(task.Files))
|
||||||
|
copy(clonedFiles, task.Files)
|
||||||
var taskClone = dto.FfmpegTask{
|
var taskClone = dto.FfmpegTask{
|
||||||
Files: task.Files,
|
Files: clonedFiles,
|
||||||
OutputFile: task.OutputFile,
|
OutputFile: task.OutputFile,
|
||||||
Offset: task.Offset,
|
Offset: task.Offset,
|
||||||
Length: task.Length,
|
Length: task.Length,
|
||||||
@@ -79,7 +84,20 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
|||||||
go func(file *dto.File) {
|
go func(file *dto.File) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
|
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
|
||||||
result, err := convertMp4ToTs(subCtx, *file, tmpFile)
|
codec, err := GetVideoCodec(subCtx, file.Url)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("获取视频编码失败", zap.Error(err))
|
||||||
|
mu.Lock()
|
||||||
|
notOk = true
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var result bool
|
||||||
|
if codec == "hevc" {
|
||||||
|
result, err = convertHevcToTs(subCtx, *file, tmpFile)
|
||||||
|
} else {
|
||||||
|
result, err = convertMp4ToTs(subCtx, *file, tmpFile)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("转码出错", zap.Error(err))
|
logger.Error("转码出错", zap.Error(err))
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
@@ -101,6 +119,12 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if notOk {
|
if notOk {
|
||||||
|
// 清理已成功转换的临时文件
|
||||||
|
for i := range taskClone.Files {
|
||||||
|
if taskClone.Files[i].Url != task.Files[i].Url {
|
||||||
|
os.Remove(taskClone.Files[i].Url)
|
||||||
|
}
|
||||||
|
}
|
||||||
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
|
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -147,7 +171,7 @@ func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool {
|
|||||||
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
|
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
// 多文件,方法二:使用计算资源编码
|
// 多文件,方法二:使用计算资源编码
|
||||||
result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Offset, task.OutputFile)
|
result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Length, task.OutputFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -183,14 +207,6 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
|
|||||||
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
|
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
stat, err := os.Stat(task.OutputFile)
|
|
||||||
if err != nil {
|
|
||||||
span.SetStatus(codes.Error, "文件不存在")
|
|
||||||
logger.Error("文件不存在", zap.String("outputFile", task.OutputFile))
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
span.SetAttributes(attribute.String("file.name", task.OutputFile))
|
|
||||||
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
|
|
||||||
if result {
|
if result {
|
||||||
outfile, err := os.Stat(task.OutputFile)
|
outfile, err := os.Stat(task.OutputFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -228,7 +244,7 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
|
|||||||
}
|
}
|
||||||
// 按照 Create 的值升序排序
|
// 按照 Create 的值升序排序
|
||||||
sort.Slice(fileList, func(i, j int) bool {
|
sort.Slice(fileList, func(i, j int) bool {
|
||||||
return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix()
|
return fileList[i].StartTime.Unix() < fileList[j].StartTime.Unix()
|
||||||
})
|
})
|
||||||
|
|
||||||
// 如果片段在中间断开时间过长
|
// 如果片段在中间断开时间过长
|
||||||
@@ -316,11 +332,12 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64
|
|||||||
FfmpegExec,
|
FfmpegExec,
|
||||||
"-hide_banner",
|
"-hide_banner",
|
||||||
"-y",
|
"-y",
|
||||||
"-i", inputFile,
|
|
||||||
"-c:v", "copy",
|
|
||||||
"-an",
|
|
||||||
"-reset_timestamps", "1",
|
|
||||||
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
||||||
|
"-i", inputFile,
|
||||||
|
"-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100",
|
||||||
|
"-c:v", "copy",
|
||||||
|
"-c:a", "aac",
|
||||||
|
"-shortest",
|
||||||
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
||||||
"-fflags", "+genpts",
|
"-fflags", "+genpts",
|
||||||
"-f", "mp4",
|
"-f", "mp4",
|
||||||
@@ -332,7 +349,7 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64
|
|||||||
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
|
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
|
||||||
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
|
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
|
tmpFile := path.Join(os.TempDir(), fmt.Sprintf("tmp%.10f.txt", rand.Float64()))
|
||||||
tmpFileObj, err := os.Create(tmpFile)
|
tmpFileObj, err := os.Create(tmpFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.SetAttributes(attribute.String("error", err.Error()))
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
@@ -359,11 +376,14 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
|
|||||||
"-y",
|
"-y",
|
||||||
"-f", "concat",
|
"-f", "concat",
|
||||||
"-safe", "0",
|
"-safe", "0",
|
||||||
"-i", tmpFile,
|
|
||||||
"-c:v", "copy",
|
|
||||||
"-an",
|
|
||||||
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
||||||
|
"-i", tmpFile,
|
||||||
|
"-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100",
|
||||||
|
"-c:v", "copy",
|
||||||
|
"-c:a", "aac",
|
||||||
|
"-shortest",
|
||||||
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
||||||
|
"-fflags", "+genpts",
|
||||||
"-f", "mp4",
|
"-f", "mp4",
|
||||||
outputFile,
|
outputFile,
|
||||||
}
|
}
|
||||||
@@ -383,6 +403,9 @@ func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length flo
|
|||||||
ffmpegCmd = append(ffmpegCmd, "-i", file.Url)
|
ffmpegCmd = append(ffmpegCmd, "-i", file.Url)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 添加静音音频源作为额外输入
|
||||||
|
ffmpegCmd = append(ffmpegCmd, "-f", "lavfi", "-i", "anullsrc=channel_layout=mono:sample_rate=44100")
|
||||||
|
|
||||||
inputCount := len(inputFiles)
|
inputCount := len(inputFiles)
|
||||||
filterComplex := strings.Builder{}
|
filterComplex := strings.Builder{}
|
||||||
for i := 0; i < inputCount; i++ {
|
for i := 0; i < inputCount; i++ {
|
||||||
@@ -393,8 +416,10 @@ func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length flo
|
|||||||
ffmpegCmd = append(ffmpegCmd,
|
ffmpegCmd = append(ffmpegCmd,
|
||||||
"-filter_complex", filterComplex.String(),
|
"-filter_complex", filterComplex.String(),
|
||||||
"-map", "[v]",
|
"-map", "[v]",
|
||||||
|
"-map", fmt.Sprintf("%d:a", inputCount),
|
||||||
|
"-c:a", "aac",
|
||||||
|
"-shortest",
|
||||||
"-preset:v", "fast",
|
"-preset:v", "fast",
|
||||||
"-an",
|
|
||||||
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
||||||
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
||||||
"-f", "mp4",
|
"-f", "mp4",
|
||||||
@@ -413,48 +438,174 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
|
|||||||
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
|
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
|
||||||
}()
|
}()
|
||||||
logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
|
logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||||
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
|
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
cmd := exec.CommandContext(timeoutCtx, ffmpegCmd[0], ffmpegCmd[1:]...)
|
||||||
|
|
||||||
var stderr bytes.Buffer
|
var stderr bytes.Buffer
|
||||||
cmd.Stderr = &stderr
|
cmd.Stderr = &stderr
|
||||||
|
|
||||||
err := cmd.Start()
|
err := cmd.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
||||||
|
if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) && ctx.Err() == nil {
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
|
||||||
|
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||||
|
return false, fmt.Errorf("ffmpeg command timed out")
|
||||||
|
}
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG执行命令被取消")
|
||||||
|
logger.Warn("FFMPEG执行命令被取消", zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||||
|
return false, ctx.Err()
|
||||||
|
}
|
||||||
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
||||||
logger.Error("FFMPEG执行命令失败",
|
logger.Error("FFMPEG执行命令失败",
|
||||||
zap.String("error", stderr.String()),
|
zap.String("error", stderr.String()),
|
||||||
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
defer cmd.Process.Kill()
|
logger.Info("FFMPEG执行命令结束",
|
||||||
|
zap.Int64("durationMs", time.Since(startTime).Milliseconds()),
|
||||||
|
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
done := make(chan error, 1)
|
func CompressVideo(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
|
||||||
go func() {
|
subCtx, span := tracer.Start(ctx, "CompressVideo")
|
||||||
done <- cmd.Wait()
|
defer span.End()
|
||||||
}()
|
span.SetAttributes(attribute.Int("height", height))
|
||||||
|
|
||||||
select {
|
hwaccel := strings.ToLower(strings.TrimSpace(config.Config.Preview.HwAccel))
|
||||||
case <-time.After(1 * time.Minute):
|
if hwaccel != "" && hwaccel != "none" {
|
||||||
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
span.SetAttributes(attribute.String("hwaccel", hwaccel))
|
||||||
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
|
ok, err := compressVideoGPU(subCtx, inputFile, outputFile, height, hwaccel)
|
||||||
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
|
if ok {
|
||||||
return false, fmt.Errorf("ffmpeg command timed out")
|
return true, nil
|
||||||
case err := <-done:
|
|
||||||
if err != nil {
|
|
||||||
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
|
||||||
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
|
||||||
logger.Error("FFMPEG执行命令失败",
|
|
||||||
zap.String("error", stderr.String()),
|
|
||||||
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
|
||||||
return false, err
|
|
||||||
}
|
}
|
||||||
endTime := time.Now()
|
logger.Warn("GPU编码失败,回退到CPU编码",
|
||||||
logger.Info("FFMPEG执行命令结束",
|
zap.String("hwaccel", hwaccel),
|
||||||
zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()),
|
zap.Error(err))
|
||||||
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
os.Remove(outputFile)
|
||||||
return true, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return compressVideoCPU(subCtx, inputFile, outputFile, height)
|
||||||
|
}
|
||||||
|
|
||||||
|
func compressVideoCPU(ctx context.Context, inputFile, outputFile string, height int) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "compressVideoCPU")
|
||||||
|
defer span.End()
|
||||||
|
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
|
||||||
|
ffmpegCmd := []string{
|
||||||
|
FfmpegExec,
|
||||||
|
"-hide_banner",
|
||||||
|
"-y",
|
||||||
|
"-i", inputFile,
|
||||||
|
"-vf", scaleFilter,
|
||||||
|
"-c:v", "libx264",
|
||||||
|
"-preset", "fast",
|
||||||
|
"-crf", "28",
|
||||||
|
"-c:a", "aac",
|
||||||
|
"-b:a", "128k",
|
||||||
|
"-f", "mp4",
|
||||||
|
outputFile,
|
||||||
|
}
|
||||||
|
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func compressVideoGPU(ctx context.Context, inputFile, outputFile string, height int, hwaccel string) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "compressVideoGPU")
|
||||||
|
defer span.End()
|
||||||
|
span.SetAttributes(attribute.String("hwaccel", hwaccel))
|
||||||
|
|
||||||
|
var ffmpegCmd []string
|
||||||
|
switch hwaccel {
|
||||||
|
case "nvenc":
|
||||||
|
scaleFilter := fmt.Sprintf("scale_cuda=-2:%d", height)
|
||||||
|
ffmpegCmd = []string{
|
||||||
|
FfmpegExec,
|
||||||
|
"-hide_banner",
|
||||||
|
"-y",
|
||||||
|
"-hwaccel", "cuda",
|
||||||
|
"-hwaccel_output_format", "cuda",
|
||||||
|
"-i", inputFile,
|
||||||
|
"-vf", scaleFilter,
|
||||||
|
"-c:v", "h264_nvenc",
|
||||||
|
"-preset", "p4",
|
||||||
|
"-cq", "28",
|
||||||
|
"-c:a", "aac",
|
||||||
|
"-b:a", "128k",
|
||||||
|
"-f", "mp4",
|
||||||
|
outputFile,
|
||||||
|
}
|
||||||
|
case "amf":
|
||||||
|
scaleFilter := fmt.Sprintf("scale=-2:%d", height)
|
||||||
|
ffmpegCmd = []string{
|
||||||
|
FfmpegExec,
|
||||||
|
"-hide_banner",
|
||||||
|
"-y",
|
||||||
|
"-i", inputFile,
|
||||||
|
"-vf", scaleFilter,
|
||||||
|
"-c:v", "h264_amf",
|
||||||
|
"-quality", "balanced",
|
||||||
|
"-rc", "cqp",
|
||||||
|
"-qp_i", "28",
|
||||||
|
"-qp_p", "28",
|
||||||
|
"-c:a", "aac",
|
||||||
|
"-b:a", "128k",
|
||||||
|
"-f", "mp4",
|
||||||
|
outputFile,
|
||||||
|
}
|
||||||
|
case "qsv":
|
||||||
|
scaleFilter := fmt.Sprintf("vpp_qsv=w=-1:h=%d", height)
|
||||||
|
ffmpegCmd = []string{
|
||||||
|
FfmpegExec,
|
||||||
|
"-hide_banner",
|
||||||
|
"-y",
|
||||||
|
"-hwaccel", "qsv",
|
||||||
|
"-hwaccel_output_format", "qsv",
|
||||||
|
"-i", inputFile,
|
||||||
|
"-vf", scaleFilter,
|
||||||
|
"-c:v", "h264_qsv",
|
||||||
|
"-preset", "faster",
|
||||||
|
"-global_quality", "28",
|
||||||
|
"-c:a", "aac",
|
||||||
|
"-b:a", "128k",
|
||||||
|
"-f", "mp4",
|
||||||
|
outputFile,
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return false, fmt.Errorf("不支持的硬件加速类型: %s", hwaccel)
|
||||||
|
}
|
||||||
|
|
||||||
|
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetVideoCodec(ctx context.Context, filePath string) (string, error) {
|
||||||
|
_, span := tracer.Start(ctx, "GetVideoCodec")
|
||||||
|
defer span.End()
|
||||||
|
ffprobeCmd := []string{
|
||||||
|
"ffprobe",
|
||||||
|
"-v", "error",
|
||||||
|
"-select_streams", "v:0",
|
||||||
|
"-show_entries", "stream=codec_name",
|
||||||
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||||
|
filePath,
|
||||||
|
}
|
||||||
|
span.SetAttributes(attribute.String("ffprobe.cmd", ToJson(ffprobeCmd)))
|
||||||
|
cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...)
|
||||||
|
var out bytes.Buffer
|
||||||
|
cmd.Stdout = &out
|
||||||
|
|
||||||
|
err := cmd.Run()
|
||||||
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "获取视频编码失败")
|
||||||
|
return "", fmt.Errorf("failed to get video codec: %w", err)
|
||||||
|
}
|
||||||
|
codec := strings.TrimSpace(out.String())
|
||||||
|
span.SetAttributes(attribute.String("video.codec", codec))
|
||||||
|
return codec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
|
func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user