From 27dfda32faec2b05c59035c2dcc0257036384710 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Tue, 30 Dec 2025 10:38:13 +0800 Subject: [PATCH] =?UTF-8?q?feat(core):=20=E6=B7=BB=E5=8A=A0=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E8=BF=9E=E7=BB=AD=E6=80=A7=E6=A3=80=E6=9F=A5=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现了连续性检查循环,每5分钟执行一次检查 - 添加了跨天跨小时的文件列表获取功能 - 实现了单个设备和所有设备的连续性检查逻辑 - 添加了连续性检查结果上报API接口 - 实现了检查结果的数据结构定义和转换功能 - 配置了9点到18点的工作时间检查范围 - 添加了详细的日志记录和OpenTelemetry追踪支持 --- api/continuity_report.go | 188 ++++++++++++++++++++++++ core/continuity.go | 306 +++++++++++++++++++++++++++++++++++++++ dto/continuity.go | 57 ++++++++ main.go | 3 + 4 files changed, 554 insertions(+) create mode 100644 api/continuity_report.go create mode 100644 core/continuity.go create mode 100644 dto/continuity.go diff --git a/api/continuity_report.go b/api/continuity_report.go new file mode 100644 index 0000000..5e9b325 --- /dev/null +++ b/api/continuity_report.go @@ -0,0 +1,188 @@ +package api + +import ( + "ZhenTuLocalPassiveAdapter/dto" + "ZhenTuLocalPassiveAdapter/logger" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.uber.org/zap" +) + +const ( + // 连续性上报接口地址 + continuityReportURL = "https://zhentuai.com/api/device/video-continuity/report" + // 间隔阈值(毫秒) + maxAllowedGapMs = 2000 +) + +// ReportContinuityCheck 上报连续性检查结果 +func ReportContinuityCheck(ctx context.Context, results []dto.ContinuityCheckResult) error { + _, span := tracer.Start(ctx, "ReportContinuityCheck") + defer span.End() + + span.SetAttributes(attribute.Int("results.count", len(results))) + + // 统计不连续的设备数量 + discontinuousCount := 0 + for _, r := range results { + if !r.IsContinuous { + discontinuousCount++ + } + } + span.SetAttributes(attribute.Int("discontinuous.count", discontinuousCount)) + + // 逐个设备上报 + var lastErr error + for _, result := range results { + if err := reportSingleDevice(ctx, result); err != nil { + logger.Error("上报设备连续性检查结果失败", + zap.String("deviceNo", result.DeviceNo), + zap.Error(err)) + lastErr = err + } + } + + return lastErr +} + +// reportSingleDevice 上报单个设备的连续性检查结果 +func reportSingleDevice(ctx context.Context, result dto.ContinuityCheckResult) error { + _, span := tracer.Start(ctx, "reportSingleDevice") + defer span.End() + + span.SetAttributes( + attribute.String("device.no", result.DeviceNo), + attribute.Bool("continuous", result.IsContinuous), + ) + + // 转换为接口请求格式 + request := convertToReportRequest(result) + + // 序列化请求体 + jsonData, err := json.Marshal(request) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "序列化JSON失败") + logger.Error("序列化连续性检查请求失败", zap.Error(err)) + return err + } + + span.SetAttributes(attribute.String("request.body", string(jsonData))) + + // 创建请求 + req, err := http.NewRequestWithContext(ctx, "POST", continuityReportURL, bytes.NewBuffer(jsonData)) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建请求失败") + logger.Error("创建连续性检查上报请求失败", zap.Error(err)) + return err + } + req.Header.Set("Content-Type", "application/json") + + span.SetAttributes( + attribute.String("http.url", continuityReportURL), + attribute.String("http.method", "POST"), + ) + + // 发送请求 + resp, err := GetAPIClient().Do(req) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "发送请求失败") + logger.Error("发送连续性检查上报请求失败", zap.Error(err)) + return err + } + defer resp.Body.Close() + + span.SetAttributes( + attribute.String("http.status", resp.Status), + attribute.Int("http.status_code", resp.StatusCode), + ) + + // 读取响应体 + body, err := io.ReadAll(resp.Body) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "读取响应体失败") + logger.Error("读取连续性检查上报响应失败", zap.Error(err)) + return err + } + + span.SetAttributes(attribute.String("response.body", string(body))) + + // 解析响应 + var response dto.ContinuityReportResponse + if err := json.Unmarshal(body, &response); err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "解析响应失败") + logger.Error("解析连续性检查上报响应失败", + zap.Error(err), + zap.String("body", string(body))) + return err + } + + // 检查业务响应码 + if response.Code != 0 { + errMsg := fmt.Sprintf("上报失败: code=%d, msg=%s", response.Code, response.Msg) + span.SetAttributes(attribute.String("error", errMsg)) + span.SetStatus(codes.Error, "上报失败") + logger.Warn("连续性检查上报业务失败", + zap.String("deviceNo", result.DeviceNo), + zap.Int("code", response.Code), + zap.String("msg", response.Msg)) + return fmt.Errorf(errMsg) + } + + span.SetStatus(codes.Ok, "上报成功") + if result.IsContinuous { + logger.Debug("设备连续性检查上报成功", + zap.String("deviceNo", result.DeviceNo), + zap.Int("fileCount", result.FileCount)) + } else { + logger.Info("设备连续性检查上报成功(不连续)", + zap.String("deviceNo", result.DeviceNo), + zap.Int("gapCount", len(result.Gaps))) + } + + return nil +} + +// convertToReportRequest 将内部检查结果转换为接口请求格式 +func convertToReportRequest(result dto.ContinuityCheckResult) dto.ContinuityReportRequest { + // 时间格式:ISO 8601 + const timeFormat = "2006-01-02T15:04:05" + + request := dto.ContinuityReportRequest{ + DeviceNo: result.DeviceNo, + StartTime: result.RangeStart.Format(timeFormat), + EndTime: result.RangeEnd.Format(timeFormat), + Support: true, // 始终支持 + Continuous: result.IsContinuous, + TotalVideos: result.FileCount, + TotalDurationMs: result.TotalDurationMs, + MaxAllowedGapMs: maxAllowedGapMs, + } + + // 转换间隙列表 + if len(result.Gaps) > 0 { + request.Gaps = make([]dto.ContinuityReportGap, 0, len(result.Gaps)) + for _, gap := range result.Gaps { + request.Gaps = append(request.Gaps, dto.ContinuityReportGap{ + BeforeFileName: gap.PreviousFileName, + AfterFileName: gap.NextFileName, + GapMs: int64(gap.GapSeconds * 1000), + GapStartTime: gap.PreviousEndTime.Format(timeFormat), + GapEndTime: gap.NextStartTime.Format(timeFormat), + }) + } + } + + return request +} diff --git a/core/continuity.go b/core/continuity.go new file mode 100644 index 0000000..4a41cb9 --- /dev/null +++ b/core/continuity.go @@ -0,0 +1,306 @@ +package core + +import ( + "ZhenTuLocalPassiveAdapter/api" + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "ZhenTuLocalPassiveAdapter/fs" + "ZhenTuLocalPassiveAdapter/logger" + "ZhenTuLocalPassiveAdapter/util" + "context" + "fmt" + "path" + "sort" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.uber.org/zap" +) + +var continuityTracer = otel.Tracer("continuity") + +const ( + // 检查间隔:5分钟 + checkInterval = 5 * time.Minute + // 检查起点:前12分钟 + rangeStartMinutes = 12 + // 检查终点:前2分钟 + rangeEndMinutes = 2 + // 间隔阈值:超过2秒视为断开 + gapThresholdSeconds = 2.0 + // 检查时间窗口:9点到18点 + checkStartHour = 9 + checkEndHour = 18 +) + +// RunContinuityCheckLoop 启动连续性检查循环 +func RunContinuityCheckLoop(ctx context.Context) { + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + logger.Info("连续性检查循环已启动", + zap.Duration("interval", checkInterval), + zap.Int("rangeStartMinutes", rangeStartMinutes), + zap.Int("rangeEndMinutes", rangeEndMinutes), + zap.Int("checkStartHour", checkStartHour), + zap.Int("checkEndHour", checkEndHour)) + + for { + select { + case <-ctx.Done(): + logger.Info("连续性检查循环已停止") + return + case <-ticker.C: + now := time.Now() + + // 只在9点到18点之间执行检查 + currentHour := now.Hour() + if currentHour < checkStartHour || currentHour >= checkEndHour { + continue + } + + startTime := now.Add(-time.Duration(rangeStartMinutes) * time.Minute) + endTime := now.Add(-time.Duration(rangeEndMinutes) * time.Minute) + + results := CheckAllDevicesContinuity(ctx, startTime, endTime) + + // 上报结果 + if len(results) > 0 { + if err := api.ReportContinuityCheck(ctx, results); err != nil { + logger.Error("上报连续性检查结果失败", zap.Error(err)) + } + } + } + } +} + +// CheckAllDevicesContinuity 检查所有设备的连续性 +func CheckAllDevicesContinuity(ctx context.Context, startTime, endTime time.Time) []dto.ContinuityCheckResult { + subCtx, span := continuityTracer.Start(ctx, "CheckAllDevicesContinuity") + defer span.End() + + span.SetAttributes( + attribute.String("range.start", startTime.Format("2006-01-02 15:04:05")), + attribute.String("range.end", endTime.Format("2006-01-02 15:04:05")), + attribute.Int("devices.count", len(config.Config.Devices)), + ) + + var results []dto.ContinuityCheckResult + + for _, device := range config.Config.Devices { + result := CheckDeviceContinuity(subCtx, device, startTime, endTime) + results = append(results, result) + + if !result.IsContinuous { + logger.Warn("设备视频片段不连续", + zap.String("deviceNo", device.DeviceNo), + zap.String("deviceName", device.Name), + zap.Int("gapCount", len(result.Gaps)), + zap.String("errorMessage", result.ErrorMessage)) + } + } + + span.SetAttributes(attribute.Int("results.count", len(results))) + span.SetStatus(codes.Ok, "检查完成") + return results +} + +// CheckDeviceContinuity 检查单个设备的连续性 +func CheckDeviceContinuity(ctx context.Context, device config.DeviceMapping, startTime, endTime time.Time) dto.ContinuityCheckResult { + subCtx, span := continuityTracer.Start(ctx, "CheckDeviceContinuity") + defer span.End() + + span.SetAttributes( + attribute.String("device.no", device.DeviceNo), + attribute.String("device.name", device.Name), + ) + + result := dto.ContinuityCheckResult{ + DeviceNo: device.DeviceNo, + DeviceName: device.Name, + CheckTime: time.Now(), + RangeStart: startTime, + RangeEnd: endTime, + IsContinuous: true, + } + + adapter := fs.GetAdapter() + + // 获取文件列表(处理跨天跨小时) + fileList, err := getFileListForCheck(subCtx, adapter, device, startTime, endTime) + if err != nil { + span.SetStatus(codes.Error, "获取文件列表失败") + result.IsContinuous = false + result.ErrorMessage = fmt.Sprintf("获取文件列表失败: %v", err) + return result + } + + // 过滤和排序 + files := util.FilterAndSortFiles(subCtx, fileList, startTime, endTime) + result.FileCount = len(files) + + if len(files) == 0 { + span.SetStatus(codes.Error, "时间范围内无文件") + result.IsContinuous = false + result.ErrorMessage = "时间范围内无视频文件" + return result + } + + // 记录实际覆盖范围 + coverageStart := files[0].StartTime + coverageEnd := files[len(files)-1].EndTime + result.CoverageStart = &coverageStart + result.CoverageEnd = &coverageEnd + + // 按开始时间排序(FilterAndSortFiles已排序,但确保顺序正确) + sort.Slice(files, func(i, j int) bool { + return files[i].StartTime.Before(files[j].StartTime) + }) + + // 计算总时长(毫秒) + var totalDurationMs int64 + for _, file := range files { + duration := file.EndTime.Sub(file.StartTime).Milliseconds() + if duration > 0 { + totalDurationMs += duration + } + } + result.TotalDurationMs = totalDurationMs + + // 检查首个文件是否覆盖检查范围起点 + if files[0].StartTime.After(startTime) { + gap := files[0].StartTime.Sub(startTime).Seconds() + if gap > gapThresholdSeconds { + result.IsContinuous = false + result.Gaps = append(result.Gaps, dto.GapInfo{ + PreviousFileName: "(检查范围起点)", + PreviousEndTime: startTime, + NextFileName: files[0].Name, + NextStartTime: files[0].StartTime, + GapSeconds: gap, + }) + } + } + + // 检查文件间隔 + if len(files) > 1 { + var lastFile *dto.File + for i := range files { + file := &files[i] + if lastFile == nil { + lastFile = file + continue + } + + gap := file.StartTime.Sub(lastFile.EndTime).Seconds() + if gap > gapThresholdSeconds { + result.IsContinuous = false + result.Gaps = append(result.Gaps, dto.GapInfo{ + PreviousFileName: lastFile.Name, + PreviousEndTime: lastFile.EndTime, + NextFileName: file.Name, + NextStartTime: file.StartTime, + GapSeconds: gap, + }) + } + lastFile = file + } + } + + // 检查末个文件是否覆盖检查范围终点 + if files[len(files)-1].EndTime.Before(endTime) { + gap := endTime.Sub(files[len(files)-1].EndTime).Seconds() + if gap > gapThresholdSeconds { + result.IsContinuous = false + result.Gaps = append(result.Gaps, dto.GapInfo{ + PreviousFileName: files[len(files)-1].Name, + PreviousEndTime: files[len(files)-1].EndTime, + NextFileName: "(检查范围终点)", + NextStartTime: endTime, + GapSeconds: gap, + }) + } + } + + if result.IsContinuous { + span.SetStatus(codes.Ok, "连续") + } else { + span.SetStatus(codes.Error, "不连续") + span.SetAttributes(attribute.Int("gaps.count", len(result.Gaps))) + } + + return result +} + +// getFileListForCheck 获取用于检查的文件列表(处理跨天、跨小时) +func getFileListForCheck(ctx context.Context, adapter fs.Adapter, device config.DeviceMapping, startTime, endTime time.Time) ([]dto.File, error) { + _, span := continuityTracer.Start(ctx, "getFileListForCheck") + defer span.End() + + var allFiles []dto.File + seen := make(map[string]struct{}) + + storageType := config.Config.Record.Storage.Type + + // 处理跨天情况:遍历从 startTime 到 endTime 涉及的每一天 + currentDate := startTime + for !currentDate.After(endTime) { + dateDirPath := path.Join(device.Name, + currentDate.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")) + + // 确定当天需要查询的小时范围 + dayStartHour := 0 + dayEndHour := 23 + + if isSameDate(currentDate, startTime) { + dayStartHour = startTime.Hour() + } + if isSameDate(currentDate, endTime) { + dayEndHour = endTime.Hour() + } + + // 对于 S3/OSS,按小时前缀查询更高效 + if storageType == "s3" || storageType == "alioss" { + for hour := dayStartHour; hour <= dayEndHour; hour++ { + hourPrefix := fmt.Sprintf("%02d", hour) + dirPathWithHour := path.Join(dateDirPath, hourPrefix) + files, err := adapter.GetFileList(ctx, dirPathWithHour, currentDate) + if err != nil { + // 某个小时目录不存在时跳过,不视为错误 + continue + } + for _, f := range files { + key := f.Path + "/" + f.Name + if _, ok := seen[key]; !ok { + seen[key] = struct{}{} + allFiles = append(allFiles, f) + } + } + } + } else { + // 本地存储直接按天目录获取 + files, err := adapter.GetFileList(ctx, dateDirPath, currentDate) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + // 某天目录不存在时继续检查其他天 + } else { + for _, f := range files { + key := f.Path + "/" + f.Name + if _, ok := seen[key]; !ok { + seen[key] = struct{}{} + allFiles = append(allFiles, f) + } + } + } + } + + // 移到下一天 + currentDate = currentDate.AddDate(0, 0, 1) + currentDate = time.Date(currentDate.Year(), currentDate.Month(), currentDate.Day(), + 0, 0, 0, 0, currentDate.Location()) + } + + return allFiles, nil +} diff --git a/dto/continuity.go b/dto/continuity.go new file mode 100644 index 0000000..6f50e9d --- /dev/null +++ b/dto/continuity.go @@ -0,0 +1,57 @@ +package dto + +import "time" + +// ContinuityCheckResult 连续性检查结果(内部使用) +type ContinuityCheckResult struct { + DeviceNo string `json:"deviceNo"` // 设备编号 + DeviceName string `json:"deviceName"` // 设备名称 + CheckTime time.Time `json:"checkTime"` // 检查执行时间 + RangeStart time.Time `json:"rangeStart"` // 检查范围起始 + RangeEnd time.Time `json:"rangeEnd"` // 检查范围结束 + IsContinuous bool `json:"isContinuous"` // 是否连续 + Gaps []GapInfo `json:"gaps,omitempty"` // 间隔详情 + CoverageStart *time.Time `json:"coverageStart,omitempty"` // 实际覆盖起始时间 + CoverageEnd *time.Time `json:"coverageEnd,omitempty"` // 实际覆盖结束时间 + FileCount int `json:"fileCount"` // 文件数量 + TotalDurationMs int64 `json:"totalDurationMs"` // 总时长(毫秒) + ErrorMessage string `json:"errorMessage,omitempty"` // 错误信息 +} + +// GapInfo 间隔信息(内部使用) +type GapInfo struct { + PreviousFileName string `json:"previousFileName"` // 前一个文件名 + PreviousEndTime time.Time `json:"previousEndTime"` // 前一个文件结束时间 + NextFileName string `json:"nextFileName"` // 后一个文件名 + NextStartTime time.Time `json:"nextStartTime"` // 后一个文件开始时间 + GapSeconds float64 `json:"gapSeconds"` // 间隔秒数 +} + +// ContinuityReportRequest 连续性检查上报请求 +type ContinuityReportRequest struct { + DeviceNo string `json:"deviceNo"` // 设备编号 + StartTime string `json:"startTime"` // 检查开始时间 (ISO 8601) + EndTime string `json:"endTime"` // 检查结束时间 (ISO 8601) + Support bool `json:"support"` // 是否支持连续性检查 + Continuous bool `json:"continuous"` // 视频是否连续 + TotalVideos int `json:"totalVideos"` // 视频总数 + TotalDurationMs int64 `json:"totalDurationMs"` // 总时长(毫秒) + MaxAllowedGapMs int64 `json:"maxAllowedGapMs,omitempty"` // 允许的最大间隙(毫秒) + Gaps []ContinuityReportGap `json:"gaps,omitempty"` // 间隙列表 +} + +// ContinuityReportGap 连续性检查上报间隙信息 +type ContinuityReportGap struct { + BeforeFileName string `json:"beforeFileName"` // 前一个文件名 + AfterFileName string `json:"afterFileName"` // 后一个文件名 + GapMs int64 `json:"gapMs"` // 间隙时长(毫秒) + GapStartTime string `json:"gapStartTime"` // 间隙开始时间 (ISO 8601) + GapEndTime string `json:"gapEndTime"` // 间隙结束时间 (ISO 8601) +} + +// ContinuityReportResponse 连续性检查上报响应 +type ContinuityReportResponse struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data any `json:"data"` +} diff --git a/main.go b/main.go index 283a417..cdeaf17 100644 --- a/main.go +++ b/main.go @@ -171,6 +171,9 @@ func main() { // Start Task Loop go runTaskLoop(ctx) + // Start Continuity Check Loop + go core.RunContinuityCheckLoop(ctx) + // Wait for signal <-sigChan logger.Info("Received shutdown signal")