package core import ( "ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/fs" "ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/util" "context" "fmt" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.uber.org/zap" "path" "time" ) var tracer = otel.Tracer("task") func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) { subCtx, span := tracer.Start(ctx, "HandleTask") defer span.End() adapter := fs.GetAdapter() span.SetAttributes(attribute.String("task.id", task.TaskID)) span.SetAttributes(attribute.String("task", util.ToJson(task))) span.SetAttributes(attribute.String("device.no", device.DeviceNo)) span.SetAttributes(attribute.String("device.name", device.Name)) dateDirPath := path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")) fileList, usedTimePrefix, err := getFileListForTask(subCtx, adapter, dateDirPath, task) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "获取文件列表失败") logger.Error("获取文件列表失败", zap.String("deviceNo", device.DeviceNo), zap.Error(err)) return nil, err } files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime) if len(files) == 0 { if usedTimePrefix { span.SetAttributes(attribute.Bool("fileList.fallback", true)) fallbackFileList, err := adapter.GetFileList(subCtx, dateDirPath, task.StartTime) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "获取文件列表失败") return nil, err } fileList = fallbackFileList files = util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime) } if len(files) == 0 { span.SetStatus(codes.Error, "没有找到文件") return nil, fmt.Errorf("没有找到文件") } } span.SetAttributes(attribute.Int("fileCount", len(files))) // 如果过滤后的文件列表无法覆盖任务开始时间,说明“按小时前缀”可能漏掉了前一小时的尾巴片段,降级为按天目录列举 if usedTimePrefix && files[0].StartTime.After(task.StartTime) { span.SetAttributes(attribute.Bool("fileList.fallback", true)) fallbackFileList, err := adapter.GetFileList(subCtx, dateDirPath, task.StartTime) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "获取文件列表失败") return nil, err } fileList = fallbackFileList files = util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime) if len(files) == 0 { span.SetStatus(codes.Error, "没有找到文件") return nil, fmt.Errorf("没有找到文件") } } constructTask, err := util.CheckFileCoverageAndConstructTask(subCtx, files, task.StartTime, task.EndTime, task) if err != nil { if usedTimePrefix { span.SetAttributes(attribute.Bool("fileList.fallback", true)) fallbackFileList, fallbackErr := adapter.GetFileList(subCtx, dateDirPath, task.StartTime) if fallbackErr != nil { span.SetAttributes(attribute.String("error", fallbackErr.Error())) span.SetStatus(codes.Error, "获取文件列表失败") return nil, fallbackErr } files = util.FilterAndSortFiles(subCtx, fallbackFileList, task.StartTime, task.EndTime) constructTask, err = util.CheckFileCoverageAndConstructTask(subCtx, files, task.StartTime, task.EndTime, task) } if err == nil { goto runFfmpeg } span.SetAttributes(attribute.String("error", err.Error())) span.SetStatus(codes.Error, "文件片段检查失败") logger.Error("文件片段检查失败", zap.String("deviceNo", device.DeviceNo), zap.Error(err)) return nil, err } runFfmpeg: ok := util.RunFfmpegTask(subCtx, constructTask) if !ok { span.SetAttributes(attribute.String("error", "ffmpeg任务执行失败")) span.SetStatus(codes.Error, "ffmpeg任务执行失败") return nil, fmt.Errorf("ffmpeg任务执行失败") } return &dto.FileObject{ CreateTime: task.EndTime, EndTime: task.EndTime, NeedDownload: true, URL: constructTask.OutputFile, }, nil } func getFileListForTask(ctx context.Context, adapter fs.Adapter, dateDirPath string, task dto.Task) ([]dto.File, bool, error) { storageType := config.Config.Record.Storage.Type if storageType != "s3" && storageType != "alioss" { fileList, err := adapter.GetFileList(ctx, dateDirPath, task.StartTime) return fileList, false, err } lookBack := time.Duration(config.Config.Record.Duration) * time.Second if lookBack < time.Minute { lookBack = time.Minute } earliestStart := task.StartTime.Add(-lookBack) if !isSameDate(earliestStart, task.StartTime) { earliestStart = task.StartTime } if !isSameDate(task.EndTime, task.StartTime) { // 约束:按业务约定不跨天;如果出现跨天,直接回退到按天目录列举(但仍只列举开始日期目录) fileList, err := adapter.GetFileList(ctx, dateDirPath, task.StartTime) return fileList, false, err } startHour := earliestStart.Hour() endHour := task.EndTime.Hour() if endHour < startHour { // 理论上只有跨天才会出现;按业务约定不跨天,直接回退 fileList, err := adapter.GetFileList(ctx, dateDirPath, task.StartTime) return fileList, false, err } seen := make(map[string]struct{}, (endHour-startHour+1)*512) var result []dto.File for hour := startHour; hour <= endHour; hour++ { hourPrefix := fmt.Sprintf("%02d", hour) dirPathWithHourPrefix := path.Join(dateDirPath, hourPrefix) fileList, err := adapter.GetFileList(ctx, dirPathWithHourPrefix, task.StartTime) if err != nil { return nil, true, err } for _, file := range fileList { key := file.Path + "/" + file.Name if _, ok := seen[key]; ok { continue } seen[key] = struct{}{} result = append(result, file) } } return result, true, nil } func isSameDate(a, b time.Time) bool { return a.Year() == b.Year() && a.Month() == b.Month() && a.Day() == b.Day() }