From 10e39a506c7016a81ea48f25082244f219c966d1 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Mon, 29 Dec 2025 18:39:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(task):=20=E4=BC=98=E5=8C=96=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E5=88=97=E8=A1=A8=E8=8E=B7=E5=8F=96=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=B9=B6=E6=B7=BB=E5=8A=A0=E7=BC=93=E5=AD=98=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现按时间前缀获取文件列表,支持小时级目录检索 - 添加降级机制,当时间前缀方式无法找到文件时回退到按天目录 - 在适配器层添加单例模式和客户端连接池管理 - 为S3和AliOSS适配器添加文件列表缓存功能 - 修复跨天任务处理逻辑,约束业务不支持跨天操作 - 优化文件去重逻辑,避免重复处理相同文件 - 添加详细的链路追踪和错误处理机制 --- core/task.go | 114 +++++++++++++++++++++++++++++++++++++++--- fs/adapter.go | 29 +++++++---- fs/ali_adapter.go | 30 +++++++++-- fs/file_list_cache.go | 22 -------- fs/s3_adapter.go | 30 +++++++++-- 5 files changed, 178 insertions(+), 47 deletions(-) diff --git a/core/task.go b/core/task.go index b6c7ad0..ebb81a1 100644 --- a/core/task.go +++ b/core/task.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel/codes" "go.uber.org/zap" "path" + "time" ) var tracer = otel.Tracer("task") @@ -25,11 +26,8 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) span.SetAttributes(attribute.String("task", util.ToJson(task))) span.SetAttributes(attribute.String("device.no", device.DeviceNo)) span.SetAttributes(attribute.String("device.name", device.Name)) - fileList, err := adapter.GetFileList( - subCtx, - path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")), - task.StartTime, - ) + dateDirPath := path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")) + fileList, usedTimePrefix, err := getFileListForTask(subCtx, adapter, dateDirPath, task) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "获取文件列表失败") @@ -38,14 +36,60 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) zap.Error(err)) return nil, err } + files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime) if len(files) == 0 { - span.SetStatus(codes.Error, "没有找到文件") - return nil, fmt.Errorf("没有找到文件") + if usedTimePrefix { + span.SetAttributes(attribute.Bool("fileList.fallback", true)) + fallbackFileList, err := adapter.GetFileList(subCtx, dateDirPath, task.StartTime) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "获取文件列表失败") + return nil, err + } + fileList = fallbackFileList + files = util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime) + } + if len(files) == 0 { + span.SetStatus(codes.Error, "没有找到文件") + return nil, fmt.Errorf("没有找到文件") + } } span.SetAttributes(attribute.Int("fileCount", len(files))) + + // 如果过滤后的文件列表无法覆盖任务开始时间,说明“按小时前缀”可能漏掉了前一小时的尾巴片段,降级为按天目录列举 + if usedTimePrefix && files[0].StartTime.After(task.StartTime) { + span.SetAttributes(attribute.Bool("fileList.fallback", true)) + fallbackFileList, err := adapter.GetFileList(subCtx, dateDirPath, task.StartTime) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "获取文件列表失败") + return nil, err + } + fileList = fallbackFileList + files = util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime) + if len(files) == 0 { + span.SetStatus(codes.Error, "没有找到文件") + return nil, fmt.Errorf("没有找到文件") + } + } + constructTask, err := util.CheckFileCoverageAndConstructTask(subCtx, files, task.StartTime, task.EndTime, task) if err != nil { + if usedTimePrefix { + span.SetAttributes(attribute.Bool("fileList.fallback", true)) + fallbackFileList, fallbackErr := adapter.GetFileList(subCtx, dateDirPath, task.StartTime) + if fallbackErr != nil { + span.SetAttributes(attribute.String("error", fallbackErr.Error())) + span.SetStatus(codes.Error, "获取文件列表失败") + return nil, fallbackErr + } + files = util.FilterAndSortFiles(subCtx, fallbackFileList, task.StartTime, task.EndTime) + constructTask, err = util.CheckFileCoverageAndConstructTask(subCtx, files, task.StartTime, task.EndTime, task) + } + if err == nil { + goto runFfmpeg + } span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "文件片段检查失败") logger.Error("文件片段检查失败", @@ -53,6 +97,8 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) zap.Error(err)) return nil, err } + +runFfmpeg: ok := util.RunFfmpegTask(subCtx, constructTask) if !ok { span.SetAttributes(attribute.String("error", "ffmpeg任务执行失败")) @@ -66,3 +112,57 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) URL: constructTask.OutputFile, }, nil } + +func getFileListForTask(ctx context.Context, adapter fs.Adapter, dateDirPath string, task dto.Task) ([]dto.File, bool, error) { + storageType := config.Config.Record.Storage.Type + if storageType != "s3" && storageType != "alioss" { + fileList, err := adapter.GetFileList(ctx, dateDirPath, task.StartTime) + return fileList, false, err + } + + lookBack := time.Duration(config.Config.Record.Duration) * time.Second + if lookBack < time.Minute { + lookBack = time.Minute + } + earliestStart := task.StartTime.Add(-lookBack) + if !isSameDate(earliestStart, task.StartTime) { + earliestStart = task.StartTime + } + if !isSameDate(task.EndTime, task.StartTime) { + // 约束:按业务约定不跨天;如果出现跨天,直接回退到按天目录列举(但仍只列举开始日期目录) + fileList, err := adapter.GetFileList(ctx, dateDirPath, task.StartTime) + return fileList, false, err + } + + startHour := earliestStart.Hour() + endHour := task.EndTime.Hour() + if endHour < startHour { + // 理论上只有跨天才会出现;按业务约定不跨天,直接回退 + fileList, err := adapter.GetFileList(ctx, dateDirPath, task.StartTime) + return fileList, false, err + } + + seen := make(map[string]struct{}, (endHour-startHour+1)*512) + var result []dto.File + for hour := startHour; hour <= endHour; hour++ { + hourPrefix := fmt.Sprintf("%02d", hour) + dirPathWithHourPrefix := path.Join(dateDirPath, hourPrefix) + fileList, err := adapter.GetFileList(ctx, dirPathWithHourPrefix, task.StartTime) + if err != nil { + return nil, true, err + } + for _, file := range fileList { + key := file.Path + "/" + file.Name + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + result = append(result, file) + } + } + return result, true, nil +} + +func isSameDate(a, b time.Time) bool { + return a.Year() == b.Year() && a.Month() == b.Month() && a.Day() == b.Day() +} diff --git a/fs/adapter.go b/fs/adapter.go index 4095048..fcc8996 100644 --- a/fs/adapter.go +++ b/fs/adapter.go @@ -4,6 +4,7 @@ import ( "ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/dto" "context" + "sync" "time" ) @@ -11,18 +12,28 @@ type Adapter interface { GetFileList(ctx context.Context, path string, relDt time.Time) ([]dto.File, error) } +var ( + adapterOnce sync.Once + adapterInstance Adapter +) + func GetAdapter() Adapter { - if config.Config.Record.Storage.Type == "s3" { - return &S3Adapter{ - StorageConfig: config.Config.Record.Storage, + adapterOnce.Do(func() { + if config.Config.Record.Storage.Type == "s3" { + adapterInstance = &S3Adapter{ + StorageConfig: config.Config.Record.Storage, + } + return } - } else if config.Config.Record.Storage.Type == "alioss" { - return &AliOSSAdapter{ - StorageConfig: config.Config.Record.Storage, + if config.Config.Record.Storage.Type == "alioss" { + adapterInstance = &AliOSSAdapter{ + StorageConfig: config.Config.Record.Storage, + } + return } - } else { - return &LocalAdapter{ + adapterInstance = &LocalAdapter{ config.Config.Record.Storage, } - } + }) + return adapterInstance } diff --git a/fs/ali_adapter.go b/fs/ali_adapter.go index 666cf3a..be219bd 100644 --- a/fs/ali_adapter.go +++ b/fs/ali_adapter.go @@ -9,6 +9,7 @@ import ( "fmt" "path" "sort" + "sync" "time" "github.com/aliyun/aliyun-oss-go-sdk/oss" @@ -18,25 +19,44 @@ import ( ) type AliOSSAdapter struct { - StorageConfig config.StorageConfig - ossClient *oss.Client + StorageConfig config.StorageConfig + fileListCacheOnce sync.Once + fileListCache *fileListCache + + clientOnce sync.Once + clientErr error + ossClient *oss.Client } func (a *AliOSSAdapter) getClient() (*oss.Client, error) { - if a.ossClient == nil { + a.clientOnce.Do(func() { client, err := oss.New( a.StorageConfig.AliOSS.Endpoint, a.StorageConfig.AliOSS.AccessKeyId, a.StorageConfig.AliOSS.AccessKeySecret, ) if err != nil { - return nil, fmt.Errorf("创建阿里云OSS客户端失败: %w", err) + a.clientErr = fmt.Errorf("创建阿里云OSS客户端失败: %w", err) + return } a.ossClient = client + }) + if a.clientErr != nil { + return nil, a.clientErr + } + if a.ossClient == nil { + return nil, fmt.Errorf("阿里云OSS客户端未初始化") } return a.ossClient, nil } +func (a *AliOSSAdapter) getFileListCache() *fileListCache { + a.fileListCacheOnce.Do(func() { + a.fileListCache = newFileListCache(getFileListCacheTTL(), getFileListCacheMaxEntries()) + }) + return a.fileListCache +} + func (a *AliOSSAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) { _, span := tracer.Start(ctx, "GetFileList_alioss") defer span.End() @@ -50,7 +70,7 @@ func (a *AliOSSAdapter) GetFileList(ctx context.Context, dirPath string, relDt t } cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02")) - fileListCache := getAliOssFileListCache() + fileListCache := a.getFileListCache() if cachedFiles, ok := fileListCache.Get(cacheKey); ok { logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey)) span.SetAttributes(attribute.Bool("cache.hit", true)) diff --git a/fs/file_list_cache.go b/fs/file_list_cache.go index 99ef1e9..bcacaf4 100644 --- a/fs/file_list_cache.go +++ b/fs/file_list_cache.go @@ -18,28 +18,6 @@ const ( fileListCacheCleanupInterval = 1 * time.Minute ) -var ( - s3FileListCacheOnce sync.Once - s3FileListCacheInstance *fileListCache - - aliOssFileListCacheOnce sync.Once - aliOssFileListCacheInstance *fileListCache -) - -func getS3FileListCache() *fileListCache { - s3FileListCacheOnce.Do(func() { - s3FileListCacheInstance = newFileListCache(getFileListCacheTTL(), getFileListCacheMaxEntries()) - }) - return s3FileListCacheInstance -} - -func getAliOssFileListCache() *fileListCache { - aliOssFileListCacheOnce.Do(func() { - aliOssFileListCacheInstance = newFileListCache(getFileListCacheTTL(), getFileListCacheMaxEntries()) - }) - return aliOssFileListCacheInstance -} - func getFileListCacheTTL() time.Duration { ttlSeconds := config.Config.Record.Cache.FileListTTLSeconds if ttlSeconds <= 0 { diff --git a/fs/s3_adapter.go b/fs/s3_adapter.go index dcec818..47c12c6 100644 --- a/fs/s3_adapter.go +++ b/fs/s3_adapter.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" "path" "sort" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -20,12 +21,17 @@ import ( ) type S3Adapter struct { - StorageConfig config.StorageConfig - s3Client *s3.Client + StorageConfig config.StorageConfig + fileListCacheOnce sync.Once + fileListCache *fileListCache + + clientOnce sync.Once + clientErr error + s3Client *s3.Client } func (s *S3Adapter) getClient() (*s3.Client, error) { - if s.s3Client == nil { + s.clientOnce.Do(func() { const defaultRegion = "us-east-1" resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { return aws.Endpoint{ @@ -42,10 +48,26 @@ func (s *S3Adapter) getClient() (*s3.Client, error) { EndpointResolver: resolver, } s.s3Client = s3.NewFromConfig(cfg) + if s.s3Client == nil { + s.clientErr = fmt.Errorf("创建S3客户端失败") + } + }) + if s.clientErr != nil { + return nil, s.clientErr + } + if s.s3Client == nil { + return nil, fmt.Errorf("S3客户端未初始化") } return s.s3Client, nil } +func (s *S3Adapter) getFileListCache() *fileListCache { + s.fileListCacheOnce.Do(func() { + s.fileListCache = newFileListCache(getFileListCacheTTL(), getFileListCacheMaxEntries()) + }) + return s.fileListCache +} + func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) { _, span := tracer.Start(ctx, "GetFileList_s3") defer span.End() @@ -59,7 +81,7 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time. } cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02")) - fileListCache := getS3FileListCache() + fileListCache := s.getFileListCache() if cachedFiles, ok := fileListCache.Get(cacheKey); ok { logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey)) span.SetAttributes(attribute.Bool("cache.hit", true))