You've already forked VptPassiveAdapter
Compare commits
18 Commits
3d989c2f47
...
master
Author | SHA1 | Date | |
---|---|---|---|
5dfe6d6356 | |||
509b829c5b | |||
e6f93a4d37 | |||
2971c5f52d | |||
3d7c88de5f | |||
f9256895b7 | |||
104930c413 | |||
cf3c518d13 | |||
cdd1358d45 | |||
c6a6518248 | |||
adf4186156 | |||
37da5abad0 | |||
a478902f98 | |||
94e1f66288 | |||
dc10092f7a | |||
b11a315b0d | |||
91bb5e4e5a | |||
4f47689253 |
@ -14,12 +14,14 @@ import (
|
||||
)
|
||||
|
||||
func UploadTaskFile(ctx context.Context, task dto.Task, file dto.FileObject) error {
|
||||
url, err := QueryUploadUrlForTask(ctx, task.TaskID)
|
||||
subCtx, span := tracer.Start(ctx, "UploadTaskFile")
|
||||
defer span.End()
|
||||
url, err := QueryUploadUrlForTask(subCtx, task.TaskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("开始上传文件, URL:【%s】\n", url)
|
||||
if err := OssUpload(ctx, url, file.URL); err != nil {
|
||||
if err := OssUpload(subCtx, url, file.URL); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
30
core/task.go
30
core/task.go
@ -11,18 +11,21 @@ import (
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
var tracer = otel.Tracer("task")
|
||||
|
||||
func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) {
|
||||
_, span := tracer.Start(ctx, "startTask")
|
||||
subCtx, span := tracer.Start(ctx, "HandleTask")
|
||||
defer span.End()
|
||||
adapter := fs.GetAdapter()
|
||||
span.SetAttributes()
|
||||
span.SetAttributes(attribute.String("task.id", task.TaskID))
|
||||
span.SetAttributes(attribute.String("task", util.ToJson(task)))
|
||||
span.SetAttributes(attribute.String("device.no", device.DeviceNo))
|
||||
span.SetAttributes(attribute.String("device.name", device.Name))
|
||||
fileList, err := adapter.GetFileList(
|
||||
ctx,
|
||||
subCtx,
|
||||
path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")),
|
||||
task.StartTime,
|
||||
)
|
||||
@ -32,38 +35,25 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
|
||||
log.Printf("获取文件列表失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
|
||||
return nil, err
|
||||
}
|
||||
files := util.FilterAndSortFiles(fileList, task.StartTime, task.EndTime)
|
||||
files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime)
|
||||
if len(files) == 0 {
|
||||
span.SetStatus(codes.Error, "没有找到文件")
|
||||
return nil, fmt.Errorf("没有找到文件")
|
||||
}
|
||||
span.SetAttributes(attribute.Int("fileCount", len(files)))
|
||||
constructTask, err := util.CheckFileCoverageAndConstructTask(ctx, files, task.StartTime, task.EndTime, task)
|
||||
constructTask, err := util.CheckFileCoverageAndConstructTask(subCtx, files, task.StartTime, task.EndTime, task)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "文件片段检查失败")
|
||||
log.Printf("文件片段检查失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
|
||||
return nil, err
|
||||
}
|
||||
ok := util.RunFfmpegTask(ctx, constructTask)
|
||||
ok := util.RunFfmpegTask(subCtx, constructTask)
|
||||
if !ok {
|
||||
span.SetAttributes(attribute.String("error", "ffmpeg任务执行失败"))
|
||||
span.SetStatus(codes.Error, "ffmpeg任务执行失败")
|
||||
return nil, fmt.Errorf("ffmpeg任务执行失败")
|
||||
}
|
||||
outfile, err := os.Stat(constructTask.OutputFile)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "文件不存在")
|
||||
return nil, fmt.Errorf("文件不存在:%s", constructTask.OutputFile)
|
||||
}
|
||||
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
||||
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
||||
if outfile.Size() < 4096 {
|
||||
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
||||
span.SetStatus(codes.Error, "文件大小过小")
|
||||
return nil, fmt.Errorf("文件大小过小:%s", constructTask.OutputFile)
|
||||
}
|
||||
return &dto.FileObject{
|
||||
CreateTime: task.EndTime,
|
||||
EndTime: task.EndTime,
|
||||
|
@ -2,7 +2,7 @@ package dto
|
||||
|
||||
type FfmpegTask struct {
|
||||
Files []File
|
||||
Length int
|
||||
Offset int
|
||||
Length float64
|
||||
Offset float64
|
||||
OutputFile string
|
||||
}
|
||||
|
@ -19,13 +19,16 @@ type LocalAdapter struct {
|
||||
}
|
||||
|
||||
func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) {
|
||||
_, span := tracer.Start(ctx, "GetFileList_local")
|
||||
subCtx, span := tracer.Start(ctx, "GetFileList_local")
|
||||
defer span.End()
|
||||
if l.StorageConfig.Path == "" {
|
||||
span.SetAttributes(attribute.String("error", "未配置存储路径"))
|
||||
span.SetStatus(codes.Error, "未配置存储路径")
|
||||
return nil, fmt.Errorf("未配置存储路径")
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.String("path", dirPath))
|
||||
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
|
||||
// 读取文件夹下目录
|
||||
files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath))
|
||||
if err != nil {
|
||||
@ -51,10 +54,13 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if startTime.Equal(stopTime) || stopTime.IsZero() {
|
||||
if stopTime.IsZero() {
|
||||
stopTime = startTime
|
||||
}
|
||||
if startTime.Equal(stopTime) {
|
||||
// 如果文件名没有时间戳,则认为该文件是未录制完成的
|
||||
// 尝试读取一下视频信息
|
||||
duration, err := util.GetVideoDuration(ctx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))
|
||||
duration, err := util.GetVideoDuration(subCtx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))
|
||||
if err != nil {
|
||||
// 如果还是没有,就按照配置文件里的加起来
|
||||
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"log"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
@ -18,6 +19,8 @@ import (
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
)
|
||||
|
||||
var s3Cache sync.Map
|
||||
|
||||
type S3Adapter struct {
|
||||
StorageConfig config.StorageConfig
|
||||
s3Client *s3.Client
|
||||
@ -49,15 +52,49 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
|
||||
_, span := tracer.Start(ctx, "GetFileList_s3")
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(attribute.String("path", dirPath))
|
||||
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
|
||||
if s.StorageConfig.S3.Bucket == "" {
|
||||
span.SetAttributes(attribute.String("error", "未配置S3存储桶"))
|
||||
span.SetStatus(codes.Error, "未配置S3存储桶")
|
||||
return nil, fmt.Errorf("未配置S3存储桶")
|
||||
}
|
||||
|
||||
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
|
||||
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
|
||||
cachedItem := cachedInterface.(cacheItem)
|
||||
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now()))
|
||||
if time.Now().Before(cachedItem.expires) {
|
||||
log.Println("获取已缓存列表", cacheKey)
|
||||
span.SetAttributes(attribute.Bool("cache.hit", true))
|
||||
return cachedItem.data, nil
|
||||
}
|
||||
}
|
||||
|
||||
mutexKey := fmt.Sprintf("lock_%s", cacheKey)
|
||||
mutex, _ := s3Cache.LoadOrStore(mutexKey, &sync.Mutex{})
|
||||
lock := mutex.(*sync.Mutex)
|
||||
defer func() {
|
||||
// 解锁后删除锁(避免内存泄漏)
|
||||
s3Cache.Delete(mutexKey)
|
||||
lock.Unlock()
|
||||
}()
|
||||
lock.Lock()
|
||||
|
||||
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
|
||||
cachedItem := cachedInterface.(cacheItem)
|
||||
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now()))
|
||||
if time.Now().Before(cachedItem.expires) {
|
||||
log.Println("过锁后获取已缓存列表", cacheKey)
|
||||
span.SetAttributes(attribute.Bool("s3Cache.hit", true))
|
||||
return cachedItem.data, nil
|
||||
}
|
||||
}
|
||||
|
||||
listObjectsInput := &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s.StorageConfig.S3.Bucket),
|
||||
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
|
||||
Bucket: aws.String(s.StorageConfig.S3.Bucket),
|
||||
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
|
||||
MaxKeys: aws.Int32(1000),
|
||||
}
|
||||
|
||||
client, err := s.getClient()
|
||||
@ -91,7 +128,10 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if startTime.Equal(stopTime) || stopTime.IsZero() {
|
||||
if stopTime.IsZero() {
|
||||
stopTime = startTime
|
||||
}
|
||||
if startTime.Equal(stopTime) {
|
||||
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
|
||||
}
|
||||
presignClient := s3.NewPresignClient(client)
|
||||
@ -128,5 +168,47 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
|
||||
return fileList[i].StartTime.Before(fileList[j].StartTime)
|
||||
})
|
||||
span.SetStatus(codes.Ok, "文件读取成功")
|
||||
|
||||
cacheItem := cacheItem{
|
||||
data: fileList,
|
||||
expires: time.Now().Add(10 * time.Second),
|
||||
}
|
||||
s3Cache.Store(cacheKey, cacheItem)
|
||||
log.Println("缓存文件列表", cacheKey)
|
||||
|
||||
return fileList, nil
|
||||
}
|
||||
|
||||
type cacheItem struct {
|
||||
data []dto.File
|
||||
expires time.Time
|
||||
}
|
||||
|
||||
// 添加定时清理缓存的初始化函数
|
||||
func init() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
cleanupCache()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 添加缓存清理函数
|
||||
func cleanupCache() {
|
||||
var keysToDelete []interface{}
|
||||
s3Cache.Range(func(key, value interface{}) bool {
|
||||
item := value.(cacheItem)
|
||||
if time.Now().After(item.expires) {
|
||||
keysToDelete = append(keysToDelete, key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
for _, key := range keysToDelete {
|
||||
s3Cache.Delete(key)
|
||||
}
|
||||
}
|
||||
|
3
main.go
3
main.go
@ -19,6 +19,7 @@ var tracer = otel.Tracer("vpt")
|
||||
|
||||
func startTask(device config.DeviceMapping, task dto.Task) {
|
||||
ctx, span := tracer.Start(context.Background(), "startTask")
|
||||
defer span.End()
|
||||
span.SetAttributes(attribute.String("deviceNo", device.DeviceNo))
|
||||
span.SetAttributes(attribute.String("taskId", task.TaskID))
|
||||
span.SetAttributes(attribute.String("scenicId", task.ScenicID))
|
||||
@ -41,7 +42,7 @@ func startTask(device config.DeviceMapping, task dto.Task) {
|
||||
return
|
||||
}
|
||||
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
|
||||
if result {
|
||||
if !result {
|
||||
span.SetStatus(codes.Error, "上报任务成功失败")
|
||||
log.Printf("上报任务成功失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
||||
return
|
||||
|
@ -4,10 +4,12 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
"time"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
|
||||
)
|
||||
|
||||
func InitTelemetry(ctx context.Context) (shutdown func(context.Context) error, err error) {
|
||||
@ -55,15 +57,27 @@ func newPropagator() propagation.TextMapPropagator {
|
||||
|
||||
func newJaegerTraceProvider(ctx context.Context) (*trace.TracerProvider, error) {
|
||||
// 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter
|
||||
traceExporter, err := otlptracehttp.New(ctx,
|
||||
otlptracehttp.WithEndpointURL("https://oltp.jerryyan.top/v1/traces"))
|
||||
res, err := resource.New(ctx,
|
||||
resource.WithFromEnv(),
|
||||
resource.WithTelemetrySDK(),
|
||||
resource.WithHost(),
|
||||
resource.WithAttributes(
|
||||
// 在可观测链路 OpenTelemetry 版后端显示的服务名称。
|
||||
semconv.ServiceNameKey.String("VPT"),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
traceProvider := trace.NewTracerProvider(
|
||||
trace.WithBatcher(traceExporter,
|
||||
// 默认为 5s。为便于演示,设置为 1s。
|
||||
trace.WithBatchTimeout(time.Second)),
|
||||
traceClientHttp := otlptracehttp.NewClient(
|
||||
otlptracehttp.WithEndpointURL("https://oltp.jerryyan.top/v1/traces"))
|
||||
otlptracehttp.WithCompression(1)
|
||||
traceExp, err := otlptrace.New(ctx, traceClientHttp)
|
||||
bsp := trace.NewBatchSpanProcessor(traceExp)
|
||||
tracerProvider := trace.NewTracerProvider(
|
||||
trace.WithSampler(trace.AlwaysSample()),
|
||||
trace.WithResource(res),
|
||||
trace.WithSpanProcessor(bsp),
|
||||
)
|
||||
return traceProvider, nil
|
||||
return tracerProvider, nil
|
||||
}
|
||||
|
182
util/ffmpeg.go
182
util/ffmpeg.go
@ -12,6 +12,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -19,49 +20,65 @@ import (
|
||||
)
|
||||
|
||||
const FfmpegExec = "ffmpeg"
|
||||
const minimalSize = 8192
|
||||
const minimalLengthRatio = 0.5
|
||||
|
||||
func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
_, span := tracer.Start(ctx, "RunFfmpegTask")
|
||||
subCtx, span := tracer.Start(ctx, "RunFfmpegTask")
|
||||
defer span.End()
|
||||
var result bool
|
||||
if len(task.Files) == 1 {
|
||||
// 单个文件切割,用简单方法
|
||||
result = runFfmpegForSingleFile(ctx, task)
|
||||
result = runFfmpegForSingleFile(subCtx, task)
|
||||
} else {
|
||||
// 多个文件切割,用速度快的
|
||||
result = runFfmpegForMultipleFile1(ctx, task)
|
||||
result = runFfmpegForMultipleFile1(subCtx, task)
|
||||
}
|
||||
// 先尝试方法1
|
||||
if !result {
|
||||
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
|
||||
// 不行再尝试方法二
|
||||
result = runFfmpegForMultipleFile2(subCtx, task)
|
||||
}
|
||||
duration, err := GetVideoDuration(subCtx, task.OutputFile)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "获取视频时长失败")
|
||||
return false
|
||||
}
|
||||
if duration < (minimalLengthRatio * float64(task.Length)) {
|
||||
span.SetStatus(codes.Error, "视频长度不达标")
|
||||
return false
|
||||
}
|
||||
if result {
|
||||
span.SetStatus(codes.Ok, "FFMPEG简易方法成功")
|
||||
span.SetStatus(codes.Ok, "成功")
|
||||
return true
|
||||
}
|
||||
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
|
||||
// 不行再尝试方法二
|
||||
result = runFfmpegForMultipleFile2(ctx, task)
|
||||
if result {
|
||||
span.SetStatus(codes.Ok, "FFMPEG复杂方法成功")
|
||||
return true
|
||||
}
|
||||
span.SetStatus(codes.Error, "FFMPEG复杂方法失败")
|
||||
span.SetStatus(codes.Error, "失败")
|
||||
return result
|
||||
}
|
||||
|
||||
func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
_, span := tracer.Start(ctx, "runFfmpegForMultipleFile1")
|
||||
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile1")
|
||||
defer span.End()
|
||||
// 多文件,方法一:先转换成ts,然后合并切割
|
||||
// 步骤一:先转换成ts,并行转换
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
var notOk bool
|
||||
var taskClone = dto.FfmpegTask{
|
||||
Files: task.Files,
|
||||
OutputFile: task.OutputFile,
|
||||
Offset: task.Offset,
|
||||
Length: task.Length,
|
||||
}
|
||||
|
||||
for i := range task.Files {
|
||||
for i := range taskClone.Files {
|
||||
wg.Add(1)
|
||||
go func(file *dto.File) {
|
||||
defer wg.Done()
|
||||
tmpFile := path.Join(os.TempDir(), file.Name+".ts")
|
||||
result, err := convertMp4ToTs(ctx, *file, tmpFile)
|
||||
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
|
||||
result, err := convertMp4ToTs(subCtx, *file, tmpFile)
|
||||
if err != nil {
|
||||
log.Printf("转码出错: %v", err)
|
||||
mu.Lock()
|
||||
@ -77,7 +94,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
// 失败了,务必删除临时文件
|
||||
os.Remove(tmpFile)
|
||||
}
|
||||
}(&task.Files[i])
|
||||
}(&taskClone.Files[i])
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
@ -88,7 +105,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
}
|
||||
|
||||
// 步骤二:使用concat协议拼接裁切
|
||||
result, err := QuickConcatVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
|
||||
result, err := QuickConcatVideoCut(subCtx, taskClone.Files, taskClone.Offset, taskClone.Length, taskClone.OutputFile)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败")
|
||||
@ -96,11 +113,27 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
}
|
||||
|
||||
// 步骤三:删除临时文件
|
||||
for _, file := range task.Files {
|
||||
for _, file := range taskClone.Files {
|
||||
if err := os.Remove(file.Url); err != nil {
|
||||
log.Printf("删除临时文件失败: %v", err)
|
||||
}
|
||||
}
|
||||
if result {
|
||||
outfile, err := os.Stat(taskClone.OutputFile)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "文件不存在")
|
||||
result = false
|
||||
} else {
|
||||
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
||||
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
||||
if outfile.Size() < minimalSize {
|
||||
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
||||
span.SetStatus(codes.Error, "文件大小过小")
|
||||
result = false
|
||||
}
|
||||
}
|
||||
}
|
||||
if result {
|
||||
span.SetStatus(codes.Ok, "FFMPEG多文件concat协议转码成功")
|
||||
} else {
|
||||
@ -110,20 +143,41 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
}
|
||||
|
||||
func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
_, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
|
||||
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
|
||||
defer span.End()
|
||||
// 多文件,方法二:使用计算资源编码
|
||||
result, err := SlowVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
|
||||
result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Offset, task.OutputFile)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if result {
|
||||
outfile, err := os.Stat(task.OutputFile)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "文件不存在")
|
||||
result = false
|
||||
} else {
|
||||
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
||||
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
||||
if outfile.Size() < minimalSize {
|
||||
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
||||
span.SetStatus(codes.Error, "文件大小过小")
|
||||
result = false
|
||||
}
|
||||
}
|
||||
}
|
||||
if result {
|
||||
span.SetStatus(codes.Ok, "FFMPEG多文件转码成功")
|
||||
} else {
|
||||
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
_, span := tracer.Start(ctx, "runFfmpegForSingleFile")
|
||||
subCtx, span := tracer.Start(ctx, "runFfmpegForSingleFile")
|
||||
defer span.End()
|
||||
result, err := QuickVideoCut(ctx, task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile)
|
||||
result, err := QuickVideoCut(subCtx, task.Files[0].Url, task.Offset, task.Length, task.OutputFile)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
|
||||
return false
|
||||
@ -136,6 +190,27 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
}
|
||||
span.SetAttributes(attribute.String("file.name", task.OutputFile))
|
||||
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
|
||||
if result {
|
||||
outfile, err := os.Stat(task.OutputFile)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "文件不存在")
|
||||
result = false
|
||||
} else {
|
||||
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
||||
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
||||
if outfile.Size() < minimalSize {
|
||||
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
||||
span.SetStatus(codes.Error, "文件大小过小")
|
||||
result = false
|
||||
}
|
||||
}
|
||||
}
|
||||
if result {
|
||||
span.SetStatus(codes.Ok, "FFMPEG单个文件裁切成功")
|
||||
} else {
|
||||
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@ -147,6 +222,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
|
||||
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt)
|
||||
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
|
||||
}
|
||||
// 按照 Create 的值升序排序
|
||||
sort.Slice(fileList, func(i, j int) bool {
|
||||
return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix()
|
||||
})
|
||||
|
||||
// 如果片段在中间断开时间过长
|
||||
if len(fileList) > 1 {
|
||||
@ -156,7 +235,7 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
|
||||
lastFile = &file
|
||||
continue
|
||||
}
|
||||
if file.StartTime.Sub(lastFile.EndTime).Milliseconds() > 2000 {
|
||||
if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 {
|
||||
// 片段断开
|
||||
span.SetStatus(codes.Error, "FFMPEG片段断开")
|
||||
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
|
||||
@ -176,19 +255,19 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
|
||||
// 构造FfmpegTaskPo
|
||||
ffmpegTask := &dto.FfmpegTask{
|
||||
Files: fileList,
|
||||
Length: int(endDt.Sub(beginDt).Seconds()),
|
||||
Offset: int(beginDt.Sub(fileList[0].StartTime).Seconds()),
|
||||
Length: endDt.Sub(beginDt).Seconds(),
|
||||
Offset: beginDt.Sub(fileList[0].StartTime).Seconds(),
|
||||
OutputFile: path.Join(os.TempDir(), task.TaskID+".mp4"),
|
||||
}
|
||||
span.SetAttributes(attribute.Int("task.files", len(ffmpegTask.Files)))
|
||||
span.SetAttributes(attribute.Int("task.offset", ffmpegTask.Offset))
|
||||
span.SetAttributes(attribute.Int("task.length", ffmpegTask.Length))
|
||||
span.SetAttributes(attribute.String("task.files", ToJson(ffmpegTask.Files)))
|
||||
span.SetAttributes(attribute.Float64("task.offset", ffmpegTask.Offset))
|
||||
span.SetAttributes(attribute.Float64("task.length", ffmpegTask.Length))
|
||||
span.SetStatus(codes.Ok, "FFMPEG任务构造成功")
|
||||
return ffmpegTask, nil
|
||||
}
|
||||
|
||||
func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
|
||||
_, span := tracer.Start(ctx, "convertMp4ToTs")
|
||||
subCtx, span := tracer.Start(ctx, "convertMp4ToTs")
|
||||
defer span.End()
|
||||
ffmpegCmd := []string{
|
||||
FfmpegExec,
|
||||
@ -200,11 +279,11 @@ func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (boo
|
||||
"-f", "mpegts",
|
||||
outFileName,
|
||||
}
|
||||
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
||||
}
|
||||
|
||||
func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
|
||||
_, span := tracer.Start(ctx, "convertHevcToTs")
|
||||
subCtx, span := tracer.Start(ctx, "convertHevcToTs")
|
||||
defer span.End()
|
||||
ffmpegCmd := []string{
|
||||
FfmpegExec,
|
||||
@ -216,11 +295,11 @@ func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bo
|
||||
"-f", "mpegts",
|
||||
outFileName,
|
||||
}
|
||||
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
||||
}
|
||||
|
||||
func QuickVideoCut(ctx context.Context, inputFile string, offset, length int64, outputFile string) (bool, error) {
|
||||
_, span := tracer.Start(ctx, "QuickVideoCut")
|
||||
func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64, outputFile string) (bool, error) {
|
||||
subCtx, span := tracer.Start(ctx, "QuickVideoCut")
|
||||
defer span.End()
|
||||
ffmpegCmd := []string{
|
||||
FfmpegExec,
|
||||
@ -230,17 +309,17 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length int64,
|
||||
"-c:v", "copy",
|
||||
"-an",
|
||||
"-reset_timestamps", "1",
|
||||
"-ss", strconv.FormatInt(offset, 10),
|
||||
"-t", strconv.FormatInt(length, 10),
|
||||
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
||||
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
||||
"-fflags", "+genpts",
|
||||
"-f", "mp4",
|
||||
outputFile,
|
||||
}
|
||||
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
||||
}
|
||||
|
||||
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
|
||||
_, span := tracer.Start(ctx, "QuickConcatVideoCut")
|
||||
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
|
||||
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
|
||||
defer span.End()
|
||||
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
|
||||
tmpFileObj, err := os.Create(tmpFile)
|
||||
@ -272,16 +351,16 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
|
||||
"-i", tmpFile,
|
||||
"-c:v", "copy",
|
||||
"-an",
|
||||
"-ss", strconv.FormatInt(offset, 10),
|
||||
"-t", strconv.FormatInt(length, 10),
|
||||
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
||||
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
||||
"-f", "mp4",
|
||||
outputFile,
|
||||
}
|
||||
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
||||
}
|
||||
|
||||
func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
|
||||
_, span := tracer.Start(ctx, "SlowVideoCut")
|
||||
func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
|
||||
subCtx, span := tracer.Start(ctx, "SlowVideoCut")
|
||||
defer span.End()
|
||||
ffmpegCmd := []string{
|
||||
FfmpegExec,
|
||||
@ -305,19 +384,19 @@ func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int
|
||||
"-map", "[v]",
|
||||
"-preset:v", "fast",
|
||||
"-an",
|
||||
"-ss", strconv.FormatInt(offset, 10),
|
||||
"-t", strconv.FormatInt(length, 10),
|
||||
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
|
||||
"-t", strconv.FormatFloat(length, 'f', 2, 64),
|
||||
"-f", "mp4",
|
||||
outputFile,
|
||||
)
|
||||
|
||||
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||
return handleFfmpegProcess(subCtx, ffmpegCmd)
|
||||
}
|
||||
|
||||
func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) {
|
||||
_, span := tracer.Start(ctx, "handleFfmpegProcess")
|
||||
defer span.End()
|
||||
span.SetAttributes(attribute.String("ffmpeg.cmd", strings.Join(ffmpegCmd, " ")))
|
||||
span.SetAttributes(attribute.String("ffmpeg.cmd", ToJson(ffmpegCmd)))
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
|
||||
@ -371,7 +450,7 @@ func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
filePath,
|
||||
}
|
||||
span.SetAttributes(attribute.String("ffprobe.cmd", strings.Join(ffprobeCmd, " ")))
|
||||
span.SetAttributes(attribute.String("ffprobe.cmd", ToJson(ffprobeCmd)))
|
||||
cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...)
|
||||
var out bytes.Buffer
|
||||
cmd.Stdout = &out
|
||||
@ -382,12 +461,11 @@ func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
|
||||
span.SetStatus(codes.Error, "failed to get video duration")
|
||||
return 0, fmt.Errorf("failed to get video duration: %w", err)
|
||||
}
|
||||
span.SetAttributes(attribute.String("ffmpeg.stdout", out.String()))
|
||||
span.SetAttributes(attribute.String("ffprobe.stdout", out.String()))
|
||||
durationStr := strings.TrimSpace(out.String())
|
||||
duration, err := strconv.ParseFloat(durationStr, 64)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "failed to parse video duration")
|
||||
return 0, fmt.Errorf("failed to parse video duration: %w", err)
|
||||
}
|
||||
span.SetAttributes(attribute.Float64("video.duration", duration))
|
||||
|
@ -2,11 +2,16 @@ package util
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"context"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
func FilterAndSortFiles(fileList []dto.File, beginDt, endDt time.Time) []dto.File {
|
||||
func FilterAndSortFiles(ctx context.Context, fileList []dto.File, beginDt, endDt time.Time) []dto.File {
|
||||
_, span := tracer.Start(ctx, "FilterAndSortFiles")
|
||||
defer span.End()
|
||||
span.SetAttributes(attribute.Int("files.count", len(fileList)))
|
||||
var filteredFiles []dto.File
|
||||
|
||||
for _, file := range fileList {
|
||||
@ -33,10 +38,10 @@ func FilterAndSortFiles(fileList []dto.File, beginDt, endDt time.Time) []dto.Fil
|
||||
}
|
||||
}
|
||||
|
||||
// 按照 DiffMs 的值降序排序
|
||||
// 按照 Create 的值升序排序
|
||||
sort.Slice(filteredFiles, func(i, j int) bool {
|
||||
return filteredFiles[i].DiffMs > filteredFiles[j].DiffMs
|
||||
return filteredFiles[i].StartTime.Unix() <= filteredFiles[j].StartTime.Unix()
|
||||
})
|
||||
|
||||
span.SetAttributes(attribute.String("files.filtered", ToJson(filteredFiles)))
|
||||
return filteredFiles
|
||||
}
|
||||
|
11
util/json.go
Normal file
11
util/json.go
Normal file
@ -0,0 +1,11 @@
|
||||
package util
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
func ToJson(v interface{}) string {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return string(b)
|
||||
}
|
Reference in New Issue
Block a user