Compare commits

...

18 Commits

Author SHA1 Message Date
5dfe6d6356 s3 优化缓存逻辑,添加缓存自清理逻辑 2025-04-21 15:08:02 +08:00
509b829c5b s3 修复缓存键避免被逻辑修改 2025-04-21 14:44:23 +08:00
e6f93a4d37 s3 避免缓存击穿 2025-04-21 14:37:51 +08:00
2971c5f52d s3 添加缓存避免延迟爆炸 2025-04-21 14:02:56 +08:00
3d7c88de5f s3 一次性10000个 2025-04-13 18:43:45 +08:00
f9256895b7 去除错误 2025-04-13 18:32:20 +08:00
104930c413 优化stopTime为空时逻辑爆炸的问题 2025-04-13 16:07:29 +08:00
cf3c518d13 支持浮点偏移 2025-04-13 15:23:58 +08:00
cdd1358d45 不修改原始数据避免失败 2025-04-13 15:09:22 +08:00
c6a6518248 临时文件使用随机数 2025-04-13 14:57:11 +08:00
adf4186156 添加时间判断,避免切出过小的视频 2025-04-13 14:53:18 +08:00
37da5abad0 优化切割逻辑,提前判断是否成功 2025-04-13 14:41:29 +08:00
a478902f98 添加过滤埋点 2025-04-13 14:11:53 +08:00
94e1f66288 修改排序,更多埋点信息 2025-04-13 12:32:27 +08:00
dc10092f7a tracer层级问题 2025-04-13 12:09:49 +08:00
b11a315b0d 写错了 2025-04-13 12:00:24 +08:00
91bb5e4e5a telemetry埋点 2025-04-13 11:55:01 +08:00
4f47689253 telemetry名称 2025-04-13 11:45:37 +08:00
10 changed files with 284 additions and 95 deletions

View File

@ -14,12 +14,14 @@ import (
)
func UploadTaskFile(ctx context.Context, task dto.Task, file dto.FileObject) error {
url, err := QueryUploadUrlForTask(ctx, task.TaskID)
subCtx, span := tracer.Start(ctx, "UploadTaskFile")
defer span.End()
url, err := QueryUploadUrlForTask(subCtx, task.TaskID)
if err != nil {
return err
}
log.Printf("开始上传文件, URL:【%s】\n", url)
if err := OssUpload(ctx, url, file.URL); err != nil {
if err := OssUpload(subCtx, url, file.URL); err != nil {
return err
}
return nil

View File

@ -11,18 +11,21 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"log"
"os"
"path"
)
var tracer = otel.Tracer("task")
func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) {
_, span := tracer.Start(ctx, "startTask")
subCtx, span := tracer.Start(ctx, "HandleTask")
defer span.End()
adapter := fs.GetAdapter()
span.SetAttributes()
span.SetAttributes(attribute.String("task.id", task.TaskID))
span.SetAttributes(attribute.String("task", util.ToJson(task)))
span.SetAttributes(attribute.String("device.no", device.DeviceNo))
span.SetAttributes(attribute.String("device.name", device.Name))
fileList, err := adapter.GetFileList(
ctx,
subCtx,
path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")),
task.StartTime,
)
@ -32,38 +35,25 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
log.Printf("获取文件列表失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
return nil, err
}
files := util.FilterAndSortFiles(fileList, task.StartTime, task.EndTime)
files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime)
if len(files) == 0 {
span.SetStatus(codes.Error, "没有找到文件")
return nil, fmt.Errorf("没有找到文件")
}
span.SetAttributes(attribute.Int("fileCount", len(files)))
constructTask, err := util.CheckFileCoverageAndConstructTask(ctx, files, task.StartTime, task.EndTime, task)
constructTask, err := util.CheckFileCoverageAndConstructTask(subCtx, files, task.StartTime, task.EndTime, task)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件片段检查失败")
log.Printf("文件片段检查失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
return nil, err
}
ok := util.RunFfmpegTask(ctx, constructTask)
ok := util.RunFfmpegTask(subCtx, constructTask)
if !ok {
span.SetAttributes(attribute.String("error", "ffmpeg任务执行失败"))
span.SetStatus(codes.Error, "ffmpeg任务执行失败")
return nil, fmt.Errorf("ffmpeg任务执行失败")
}
outfile, err := os.Stat(constructTask.OutputFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件不存在")
return nil, fmt.Errorf("文件不存在:%s", constructTask.OutputFile)
}
span.SetAttributes(attribute.String("file.name", outfile.Name()))
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
if outfile.Size() < 4096 {
span.SetAttributes(attribute.String("error", "文件大小过小"))
span.SetStatus(codes.Error, "文件大小过小")
return nil, fmt.Errorf("文件大小过小:%s", constructTask.OutputFile)
}
return &dto.FileObject{
CreateTime: task.EndTime,
EndTime: task.EndTime,

View File

@ -2,7 +2,7 @@ package dto
type FfmpegTask struct {
Files []File
Length int
Offset int
Length float64
Offset float64
OutputFile string
}

View File

@ -19,13 +19,16 @@ type LocalAdapter struct {
}
func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) {
_, span := tracer.Start(ctx, "GetFileList_local")
subCtx, span := tracer.Start(ctx, "GetFileList_local")
defer span.End()
if l.StorageConfig.Path == "" {
span.SetAttributes(attribute.String("error", "未配置存储路径"))
span.SetStatus(codes.Error, "未配置存储路径")
return nil, fmt.Errorf("未配置存储路径")
}
span.SetAttributes(attribute.String("path", dirPath))
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
// 读取文件夹下目录
files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath))
if err != nil {
@ -51,10 +54,13 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
if err != nil {
continue
}
if startTime.Equal(stopTime) || stopTime.IsZero() {
if stopTime.IsZero() {
stopTime = startTime
}
if startTime.Equal(stopTime) {
// 如果文件名没有时间戳,则认为该文件是未录制完成的
// 尝试读取一下视频信息
duration, err := util.GetVideoDuration(ctx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))
duration, err := util.GetVideoDuration(subCtx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))
if err != nil {
// 如果还是没有,就按照配置文件里的加起来
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))

View File

@ -10,6 +10,7 @@ import (
"log"
"path"
"sort"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
@ -18,6 +19,8 @@ import (
"go.opentelemetry.io/otel/codes"
)
var s3Cache sync.Map
type S3Adapter struct {
StorageConfig config.StorageConfig
s3Client *s3.Client
@ -49,15 +52,49 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
_, span := tracer.Start(ctx, "GetFileList_s3")
defer span.End()
span.SetAttributes(attribute.String("path", dirPath))
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
if s.StorageConfig.S3.Bucket == "" {
span.SetAttributes(attribute.String("error", "未配置S3存储桶"))
span.SetStatus(codes.Error, "未配置S3存储桶")
return nil, fmt.Errorf("未配置S3存储桶")
}
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now()))
if time.Now().Before(cachedItem.expires) {
log.Println("获取已缓存列表", cacheKey)
span.SetAttributes(attribute.Bool("cache.hit", true))
return cachedItem.data, nil
}
}
mutexKey := fmt.Sprintf("lock_%s", cacheKey)
mutex, _ := s3Cache.LoadOrStore(mutexKey, &sync.Mutex{})
lock := mutex.(*sync.Mutex)
defer func() {
// 解锁后删除锁(避免内存泄漏)
s3Cache.Delete(mutexKey)
lock.Unlock()
}()
lock.Lock()
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now()))
if time.Now().Before(cachedItem.expires) {
log.Println("过锁后获取已缓存列表", cacheKey)
span.SetAttributes(attribute.Bool("s3Cache.hit", true))
return cachedItem.data, nil
}
}
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(s.StorageConfig.S3.Bucket),
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
Bucket: aws.String(s.StorageConfig.S3.Bucket),
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
MaxKeys: aws.Int32(1000),
}
client, err := s.getClient()
@ -91,7 +128,10 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
if err != nil {
continue
}
if startTime.Equal(stopTime) || stopTime.IsZero() {
if stopTime.IsZero() {
stopTime = startTime
}
if startTime.Equal(stopTime) {
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
}
presignClient := s3.NewPresignClient(client)
@ -128,5 +168,47 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
return fileList[i].StartTime.Before(fileList[j].StartTime)
})
span.SetStatus(codes.Ok, "文件读取成功")
cacheItem := cacheItem{
data: fileList,
expires: time.Now().Add(10 * time.Second),
}
s3Cache.Store(cacheKey, cacheItem)
log.Println("缓存文件列表", cacheKey)
return fileList, nil
}
type cacheItem struct {
data []dto.File
expires time.Time
}
// 添加定时清理缓存的初始化函数
func init() {
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cleanupCache()
}
}
}()
}
// 添加缓存清理函数
func cleanupCache() {
var keysToDelete []interface{}
s3Cache.Range(func(key, value interface{}) bool {
item := value.(cacheItem)
if time.Now().After(item.expires) {
keysToDelete = append(keysToDelete, key)
}
return true
})
for _, key := range keysToDelete {
s3Cache.Delete(key)
}
}

View File

@ -19,6 +19,7 @@ var tracer = otel.Tracer("vpt")
func startTask(device config.DeviceMapping, task dto.Task) {
ctx, span := tracer.Start(context.Background(), "startTask")
defer span.End()
span.SetAttributes(attribute.String("deviceNo", device.DeviceNo))
span.SetAttributes(attribute.String("taskId", task.TaskID))
span.SetAttributes(attribute.String("scenicId", task.ScenicID))
@ -41,7 +42,7 @@ func startTask(device config.DeviceMapping, task dto.Task) {
return
}
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
if result {
if !result {
span.SetStatus(codes.Error, "上报任务成功失败")
log.Printf("上报任务成功失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
return

View File

@ -4,10 +4,12 @@ import (
"context"
"errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
"time"
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
)
func InitTelemetry(ctx context.Context) (shutdown func(context.Context) error, err error) {
@ -55,15 +57,27 @@ func newPropagator() propagation.TextMapPropagator {
func newJaegerTraceProvider(ctx context.Context) (*trace.TracerProvider, error) {
// 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter
traceExporter, err := otlptracehttp.New(ctx,
otlptracehttp.WithEndpointURL("https://oltp.jerryyan.top/v1/traces"))
res, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithTelemetrySDK(),
resource.WithHost(),
resource.WithAttributes(
// 在可观测链路 OpenTelemetry 版后端显示的服务名称。
semconv.ServiceNameKey.String("VPT"),
),
)
if err != nil {
return nil, err
}
traceProvider := trace.NewTracerProvider(
trace.WithBatcher(traceExporter,
// 默认为 5s。为便于演示,设置为 1s。
trace.WithBatchTimeout(time.Second)),
traceClientHttp := otlptracehttp.NewClient(
otlptracehttp.WithEndpointURL("https://oltp.jerryyan.top/v1/traces"))
otlptracehttp.WithCompression(1)
traceExp, err := otlptrace.New(ctx, traceClientHttp)
bsp := trace.NewBatchSpanProcessor(traceExp)
tracerProvider := trace.NewTracerProvider(
trace.WithSampler(trace.AlwaysSample()),
trace.WithResource(res),
trace.WithSpanProcessor(bsp),
)
return traceProvider, nil
return tracerProvider, nil
}

View File

@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path"
"sort"
"strconv"
"strings"
"sync"
@ -19,49 +20,65 @@ import (
)
const FfmpegExec = "ffmpeg"
const minimalSize = 8192
const minimalLengthRatio = 0.5
func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool {
_, span := tracer.Start(ctx, "RunFfmpegTask")
subCtx, span := tracer.Start(ctx, "RunFfmpegTask")
defer span.End()
var result bool
if len(task.Files) == 1 {
// 单个文件切割,用简单方法
result = runFfmpegForSingleFile(ctx, task)
result = runFfmpegForSingleFile(subCtx, task)
} else {
// 多个文件切割,用速度快的
result = runFfmpegForMultipleFile1(ctx, task)
result = runFfmpegForMultipleFile1(subCtx, task)
}
// 先尝试方法1
if !result {
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
// 不行再尝试方法二
result = runFfmpegForMultipleFile2(subCtx, task)
}
duration, err := GetVideoDuration(subCtx, task.OutputFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "获取视频时长失败")
return false
}
if duration < (minimalLengthRatio * float64(task.Length)) {
span.SetStatus(codes.Error, "视频长度不达标")
return false
}
if result {
span.SetStatus(codes.Ok, "FFMPEG简易方法成功")
span.SetStatus(codes.Ok, "成功")
return true
}
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
// 不行再尝试方法二
result = runFfmpegForMultipleFile2(ctx, task)
if result {
span.SetStatus(codes.Ok, "FFMPEG复杂方法成功")
return true
}
span.SetStatus(codes.Error, "FFMPEG复杂方法失败")
span.SetStatus(codes.Error, "失败")
return result
}
func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
_, span := tracer.Start(ctx, "runFfmpegForMultipleFile1")
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile1")
defer span.End()
// 多文件,方法一:先转换成ts,然后合并切割
// 步骤一:先转换成ts,并行转换
var wg sync.WaitGroup
var mu sync.Mutex
var notOk bool
var taskClone = dto.FfmpegTask{
Files: task.Files,
OutputFile: task.OutputFile,
Offset: task.Offset,
Length: task.Length,
}
for i := range task.Files {
for i := range taskClone.Files {
wg.Add(1)
go func(file *dto.File) {
defer wg.Done()
tmpFile := path.Join(os.TempDir(), file.Name+".ts")
result, err := convertMp4ToTs(ctx, *file, tmpFile)
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
result, err := convertMp4ToTs(subCtx, *file, tmpFile)
if err != nil {
log.Printf("转码出错: %v", err)
mu.Lock()
@ -77,7 +94,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
// 失败了,务必删除临时文件
os.Remove(tmpFile)
}
}(&task.Files[i])
}(&taskClone.Files[i])
}
wg.Wait()
@ -88,7 +105,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
}
// 步骤二:使用concat协议拼接裁切
result, err := QuickConcatVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
result, err := QuickConcatVideoCut(subCtx, taskClone.Files, taskClone.Offset, taskClone.Length, taskClone.OutputFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败")
@ -96,11 +113,27 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
}
// 步骤三:删除临时文件
for _, file := range task.Files {
for _, file := range taskClone.Files {
if err := os.Remove(file.Url); err != nil {
log.Printf("删除临时文件失败: %v", err)
}
}
if result {
outfile, err := os.Stat(taskClone.OutputFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件不存在")
result = false
} else {
span.SetAttributes(attribute.String("file.name", outfile.Name()))
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
if outfile.Size() < minimalSize {
span.SetAttributes(attribute.String("error", "文件大小过小"))
span.SetStatus(codes.Error, "文件大小过小")
result = false
}
}
}
if result {
span.SetStatus(codes.Ok, "FFMPEG多文件concat协议转码成功")
} else {
@ -110,20 +143,41 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
}
func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool {
_, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
subCtx, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
defer span.End()
// 多文件,方法二:使用计算资源编码
result, err := SlowVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
result, err := SlowVideoCut(subCtx, task.Files, task.Offset, task.Offset, task.OutputFile)
if err != nil {
return false
}
if result {
outfile, err := os.Stat(task.OutputFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件不存在")
result = false
} else {
span.SetAttributes(attribute.String("file.name", outfile.Name()))
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
if outfile.Size() < minimalSize {
span.SetAttributes(attribute.String("error", "文件大小过小"))
span.SetStatus(codes.Error, "文件大小过小")
result = false
}
}
}
if result {
span.SetStatus(codes.Ok, "FFMPEG多文件转码成功")
} else {
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
}
return result
}
func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
_, span := tracer.Start(ctx, "runFfmpegForSingleFile")
subCtx, span := tracer.Start(ctx, "runFfmpegForSingleFile")
defer span.End()
result, err := QuickVideoCut(ctx, task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile)
result, err := QuickVideoCut(subCtx, task.Files[0].Url, task.Offset, task.Length, task.OutputFile)
if err != nil {
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
return false
@ -136,6 +190,27 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
}
span.SetAttributes(attribute.String("file.name", task.OutputFile))
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
if result {
outfile, err := os.Stat(task.OutputFile)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件不存在")
result = false
} else {
span.SetAttributes(attribute.String("file.name", outfile.Name()))
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
if outfile.Size() < minimalSize {
span.SetAttributes(attribute.String("error", "文件大小过小"))
span.SetStatus(codes.Error, "文件大小过小")
result = false
}
}
}
if result {
span.SetStatus(codes.Ok, "FFMPEG单个文件裁切成功")
} else {
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
}
return result
}
@ -147,6 +222,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt)
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
}
// 按照 Create 的值升序排序
sort.Slice(fileList, func(i, j int) bool {
return fileList[i].StartTime.Unix() <= fileList[j].StartTime.Unix()
})
// 如果片段在中间断开时间过长
if len(fileList) > 1 {
@ -156,7 +235,7 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
lastFile = &file
continue
}
if file.StartTime.Sub(lastFile.EndTime).Milliseconds() > 2000 {
if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 {
// 片段断开
span.SetStatus(codes.Error, "FFMPEG片段断开")
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
@ -176,19 +255,19 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
// 构造FfmpegTaskPo
ffmpegTask := &dto.FfmpegTask{
Files: fileList,
Length: int(endDt.Sub(beginDt).Seconds()),
Offset: int(beginDt.Sub(fileList[0].StartTime).Seconds()),
Length: endDt.Sub(beginDt).Seconds(),
Offset: beginDt.Sub(fileList[0].StartTime).Seconds(),
OutputFile: path.Join(os.TempDir(), task.TaskID+".mp4"),
}
span.SetAttributes(attribute.Int("task.files", len(ffmpegTask.Files)))
span.SetAttributes(attribute.Int("task.offset", ffmpegTask.Offset))
span.SetAttributes(attribute.Int("task.length", ffmpegTask.Length))
span.SetAttributes(attribute.String("task.files", ToJson(ffmpegTask.Files)))
span.SetAttributes(attribute.Float64("task.offset", ffmpegTask.Offset))
span.SetAttributes(attribute.Float64("task.length", ffmpegTask.Length))
span.SetStatus(codes.Ok, "FFMPEG任务构造成功")
return ffmpegTask, nil
}
func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
_, span := tracer.Start(ctx, "convertMp4ToTs")
subCtx, span := tracer.Start(ctx, "convertMp4ToTs")
defer span.End()
ffmpegCmd := []string{
FfmpegExec,
@ -200,11 +279,11 @@ func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (boo
"-f", "mpegts",
outFileName,
}
return handleFfmpegProcess(ctx, ffmpegCmd)
return handleFfmpegProcess(subCtx, ffmpegCmd)
}
func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
_, span := tracer.Start(ctx, "convertHevcToTs")
subCtx, span := tracer.Start(ctx, "convertHevcToTs")
defer span.End()
ffmpegCmd := []string{
FfmpegExec,
@ -216,11 +295,11 @@ func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bo
"-f", "mpegts",
outFileName,
}
return handleFfmpegProcess(ctx, ffmpegCmd)
return handleFfmpegProcess(subCtx, ffmpegCmd)
}
func QuickVideoCut(ctx context.Context, inputFile string, offset, length int64, outputFile string) (bool, error) {
_, span := tracer.Start(ctx, "QuickVideoCut")
func QuickVideoCut(ctx context.Context, inputFile string, offset, length float64, outputFile string) (bool, error) {
subCtx, span := tracer.Start(ctx, "QuickVideoCut")
defer span.End()
ffmpegCmd := []string{
FfmpegExec,
@ -230,17 +309,17 @@ func QuickVideoCut(ctx context.Context, inputFile string, offset, length int64,
"-c:v", "copy",
"-an",
"-reset_timestamps", "1",
"-ss", strconv.FormatInt(offset, 10),
"-t", strconv.FormatInt(length, 10),
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-t", strconv.FormatFloat(length, 'f', 2, 64),
"-fflags", "+genpts",
"-f", "mp4",
outputFile,
}
return handleFfmpegProcess(ctx, ffmpegCmd)
return handleFfmpegProcess(subCtx, ffmpegCmd)
}
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
_, span := tracer.Start(ctx, "QuickConcatVideoCut")
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
subCtx, span := tracer.Start(ctx, "QuickConcatVideoCut")
defer span.End()
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
tmpFileObj, err := os.Create(tmpFile)
@ -272,16 +351,16 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
"-i", tmpFile,
"-c:v", "copy",
"-an",
"-ss", strconv.FormatInt(offset, 10),
"-t", strconv.FormatInt(length, 10),
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-t", strconv.FormatFloat(length, 'f', 2, 64),
"-f", "mp4",
outputFile,
}
return handleFfmpegProcess(ctx, ffmpegCmd)
return handleFfmpegProcess(subCtx, ffmpegCmd)
}
func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
_, span := tracer.Start(ctx, "SlowVideoCut")
func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length float64, outputFile string) (bool, error) {
subCtx, span := tracer.Start(ctx, "SlowVideoCut")
defer span.End()
ffmpegCmd := []string{
FfmpegExec,
@ -305,19 +384,19 @@ func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int
"-map", "[v]",
"-preset:v", "fast",
"-an",
"-ss", strconv.FormatInt(offset, 10),
"-t", strconv.FormatInt(length, 10),
"-ss", strconv.FormatFloat(offset, 'f', 2, 64),
"-t", strconv.FormatFloat(length, 'f', 2, 64),
"-f", "mp4",
outputFile,
)
return handleFfmpegProcess(ctx, ffmpegCmd)
return handleFfmpegProcess(subCtx, ffmpegCmd)
}
func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) {
_, span := tracer.Start(ctx, "handleFfmpegProcess")
defer span.End()
span.SetAttributes(attribute.String("ffmpeg.cmd", strings.Join(ffmpegCmd, " ")))
span.SetAttributes(attribute.String("ffmpeg.cmd", ToJson(ffmpegCmd)))
startTime := time.Now()
defer func() {
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
@ -371,7 +450,7 @@ func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
"-of", "default=noprint_wrappers=1:nokey=1",
filePath,
}
span.SetAttributes(attribute.String("ffprobe.cmd", strings.Join(ffprobeCmd, " ")))
span.SetAttributes(attribute.String("ffprobe.cmd", ToJson(ffprobeCmd)))
cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...)
var out bytes.Buffer
cmd.Stdout = &out
@ -382,12 +461,11 @@ func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
span.SetStatus(codes.Error, "failed to get video duration")
return 0, fmt.Errorf("failed to get video duration: %w", err)
}
span.SetAttributes(attribute.String("ffmpeg.stdout", out.String()))
span.SetAttributes(attribute.String("ffprobe.stdout", out.String()))
durationStr := strings.TrimSpace(out.String())
duration, err := strconv.ParseFloat(durationStr, 64)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "failed to parse video duration")
return 0, fmt.Errorf("failed to parse video duration: %w", err)
}
span.SetAttributes(attribute.Float64("video.duration", duration))

View File

@ -2,11 +2,16 @@ package util
import (
"ZhenTuLocalPassiveAdapter/dto"
"context"
"go.opentelemetry.io/otel/attribute"
"sort"
"time"
)
func FilterAndSortFiles(fileList []dto.File, beginDt, endDt time.Time) []dto.File {
func FilterAndSortFiles(ctx context.Context, fileList []dto.File, beginDt, endDt time.Time) []dto.File {
_, span := tracer.Start(ctx, "FilterAndSortFiles")
defer span.End()
span.SetAttributes(attribute.Int("files.count", len(fileList)))
var filteredFiles []dto.File
for _, file := range fileList {
@ -33,10 +38,10 @@ func FilterAndSortFiles(fileList []dto.File, beginDt, endDt time.Time) []dto.Fil
}
}
// 按照 DiffMs 的值序排序
// 按照 Create 的值序排序
sort.Slice(filteredFiles, func(i, j int) bool {
return filteredFiles[i].DiffMs > filteredFiles[j].DiffMs
return filteredFiles[i].StartTime.Unix() <= filteredFiles[j].StartTime.Unix()
})
span.SetAttributes(attribute.String("files.filtered", ToJson(filteredFiles)))
return filteredFiles
}

11
util/json.go Normal file
View File

@ -0,0 +1,11 @@
package util
import "encoding/json"
func ToJson(v interface{}) string {
b, err := json.Marshal(v)
if err != nil {
return ""
}
return string(b)
}