Files
VptPassiveAdapter/core/continuity.go
Jerry Yan 12d34c5f79 refactor(continuity): 优化连续性检查逻辑和时间格式
- 将时间格式从 ISO 8601 修改为标准日期时间格式
- 修复连续性检查中 Gaps 字段的空数组初始化问题
- 重构连续性检查循环逻辑,启动时立即执行一次检查
- 提取连续性检查逻辑到独立的 performContinuityCheck 函数
- 优化检查时间范围的判断逻辑
2025-12-30 10:50:34 +08:00

315 lines
8.9 KiB
Go

package core
import (
"ZhenTuLocalPassiveAdapter/api"
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/fs"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util"
"context"
"fmt"
"path"
"sort"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)
var continuityTracer = otel.Tracer("continuity")
const (
// 检查间隔:5分钟
checkInterval = 5 * time.Minute
// 检查起点:前12分钟
rangeStartMinutes = 12
// 检查终点:前2分钟
rangeEndMinutes = 2
// 间隔阈值:超过2秒视为断开
gapThresholdSeconds = 2.0
// 检查时间窗口:9点到18点
checkStartHour = 9
checkEndHour = 18
)
// RunContinuityCheckLoop 启动连续性检查循环
func RunContinuityCheckLoop(ctx context.Context) {
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
logger.Info("连续性检查循环已启动",
zap.Duration("interval", checkInterval),
zap.Int("rangeStartMinutes", rangeStartMinutes),
zap.Int("rangeEndMinutes", rangeEndMinutes),
zap.Int("checkStartHour", checkStartHour),
zap.Int("checkEndHour", checkEndHour))
// 启动时立即执行一次检查
performContinuityCheck(ctx)
for {
select {
case <-ctx.Done():
logger.Info("连续性检查循环已停止")
return
case <-ticker.C:
performContinuityCheck(ctx)
}
}
}
// performContinuityCheck 执行一次连续性检查
func performContinuityCheck(ctx context.Context) {
now := time.Now()
// 只在9点到18点之间执行检查
currentHour := now.Hour()
if currentHour < checkStartHour || currentHour >= checkEndHour {
return
}
startTime := now.Add(-time.Duration(rangeStartMinutes) * time.Minute)
endTime := now.Add(-time.Duration(rangeEndMinutes) * time.Minute)
results := CheckAllDevicesContinuity(ctx, startTime, endTime)
// 上报结果
if len(results) > 0 {
if err := api.ReportContinuityCheck(ctx, results); err != nil {
logger.Error("上报连续性检查结果失败", zap.Error(err))
}
}
}
// CheckAllDevicesContinuity 检查所有设备的连续性
func CheckAllDevicesContinuity(ctx context.Context, startTime, endTime time.Time) []dto.ContinuityCheckResult {
subCtx, span := continuityTracer.Start(ctx, "CheckAllDevicesContinuity")
defer span.End()
span.SetAttributes(
attribute.String("range.start", startTime.Format("2006-01-02 15:04:05")),
attribute.String("range.end", endTime.Format("2006-01-02 15:04:05")),
attribute.Int("devices.count", len(config.Config.Devices)),
)
var results []dto.ContinuityCheckResult
for _, device := range config.Config.Devices {
result := CheckDeviceContinuity(subCtx, device, startTime, endTime)
results = append(results, result)
if !result.IsContinuous {
logger.Warn("设备视频片段不连续",
zap.String("deviceNo", device.DeviceNo),
zap.String("deviceName", device.Name),
zap.Int("gapCount", len(result.Gaps)),
zap.String("errorMessage", result.ErrorMessage))
}
}
span.SetAttributes(attribute.Int("results.count", len(results)))
span.SetStatus(codes.Ok, "检查完成")
return results
}
// CheckDeviceContinuity 检查单个设备的连续性
func CheckDeviceContinuity(ctx context.Context, device config.DeviceMapping, startTime, endTime time.Time) dto.ContinuityCheckResult {
subCtx, span := continuityTracer.Start(ctx, "CheckDeviceContinuity")
defer span.End()
span.SetAttributes(
attribute.String("device.no", device.DeviceNo),
attribute.String("device.name", device.Name),
)
result := dto.ContinuityCheckResult{
DeviceNo: device.DeviceNo,
DeviceName: device.Name,
CheckTime: time.Now(),
RangeStart: startTime,
RangeEnd: endTime,
IsContinuous: true,
}
adapter := fs.GetAdapter()
// 获取文件列表(处理跨天跨小时)
fileList, err := getFileListForCheck(subCtx, adapter, device, startTime, endTime)
if err != nil {
span.SetStatus(codes.Error, "获取文件列表失败")
result.IsContinuous = false
result.ErrorMessage = fmt.Sprintf("获取文件列表失败: %v", err)
return result
}
// 过滤和排序
files := util.FilterAndSortFiles(subCtx, fileList, startTime, endTime)
result.FileCount = len(files)
if len(files) == 0 {
span.SetStatus(codes.Error, "时间范围内无文件")
result.IsContinuous = false
result.ErrorMessage = "时间范围内无视频文件"
return result
}
// 记录实际覆盖范围
coverageStart := files[0].StartTime
coverageEnd := files[len(files)-1].EndTime
result.CoverageStart = &coverageStart
result.CoverageEnd = &coverageEnd
// 按开始时间排序(FilterAndSortFiles已排序,但确保顺序正确)
sort.Slice(files, func(i, j int) bool {
return files[i].StartTime.Before(files[j].StartTime)
})
// 计算总时长(毫秒)
var totalDurationMs int64
for _, file := range files {
duration := file.EndTime.Sub(file.StartTime).Milliseconds()
if duration > 0 {
totalDurationMs += duration
}
}
result.TotalDurationMs = totalDurationMs
// 检查首个文件是否覆盖检查范围起点
if files[0].StartTime.After(startTime) {
gap := files[0].StartTime.Sub(startTime).Seconds()
if gap > gapThresholdSeconds {
result.IsContinuous = false
result.Gaps = append(result.Gaps, dto.GapInfo{
PreviousFileName: "(检查范围起点)",
PreviousEndTime: startTime,
NextFileName: files[0].Name,
NextStartTime: files[0].StartTime,
GapSeconds: gap,
})
}
}
// 检查文件间隔
if len(files) > 1 {
var lastFile *dto.File
for i := range files {
file := &files[i]
if lastFile == nil {
lastFile = file
continue
}
gap := file.StartTime.Sub(lastFile.EndTime).Seconds()
if gap > gapThresholdSeconds {
result.IsContinuous = false
result.Gaps = append(result.Gaps, dto.GapInfo{
PreviousFileName: lastFile.Name,
PreviousEndTime: lastFile.EndTime,
NextFileName: file.Name,
NextStartTime: file.StartTime,
GapSeconds: gap,
})
}
lastFile = file
}
}
// 检查末个文件是否覆盖检查范围终点
if files[len(files)-1].EndTime.Before(endTime) {
gap := endTime.Sub(files[len(files)-1].EndTime).Seconds()
if gap > gapThresholdSeconds {
result.IsContinuous = false
result.Gaps = append(result.Gaps, dto.GapInfo{
PreviousFileName: files[len(files)-1].Name,
PreviousEndTime: files[len(files)-1].EndTime,
NextFileName: "(检查范围终点)",
NextStartTime: endTime,
GapSeconds: gap,
})
}
}
if result.IsContinuous {
span.SetStatus(codes.Ok, "连续")
} else {
span.SetStatus(codes.Error, "不连续")
span.SetAttributes(attribute.Int("gaps.count", len(result.Gaps)))
}
return result
}
// getFileListForCheck 获取用于检查的文件列表(处理跨天、跨小时)
func getFileListForCheck(ctx context.Context, adapter fs.Adapter, device config.DeviceMapping, startTime, endTime time.Time) ([]dto.File, error) {
_, span := continuityTracer.Start(ctx, "getFileListForCheck")
defer span.End()
var allFiles []dto.File
seen := make(map[string]struct{})
storageType := config.Config.Record.Storage.Type
// 处理跨天情况:遍历从 startTime 到 endTime 涉及的每一天
currentDate := startTime
for !currentDate.After(endTime) {
dateDirPath := path.Join(device.Name,
currentDate.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02"))
// 确定当天需要查询的小时范围
dayStartHour := 0
dayEndHour := 23
if isSameDate(currentDate, startTime) {
dayStartHour = startTime.Hour()
}
if isSameDate(currentDate, endTime) {
dayEndHour = endTime.Hour()
}
// 对于 S3/OSS,按小时前缀查询更高效
if storageType == "s3" || storageType == "alioss" {
for hour := dayStartHour; hour <= dayEndHour; hour++ {
hourPrefix := fmt.Sprintf("%02d", hour)
dirPathWithHour := path.Join(dateDirPath, hourPrefix)
files, err := adapter.GetFileList(ctx, dirPathWithHour, currentDate)
if err != nil {
// 某个小时目录不存在时跳过,不视为错误
continue
}
for _, f := range files {
key := f.Path + "/" + f.Name
if _, ok := seen[key]; !ok {
seen[key] = struct{}{}
allFiles = append(allFiles, f)
}
}
}
} else {
// 本地存储直接按天目录获取
files, err := adapter.GetFileList(ctx, dateDirPath, currentDate)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
// 某天目录不存在时继续检查其他天
} else {
for _, f := range files {
key := f.Path + "/" + f.Name
if _, ok := seen[key]; !ok {
seen[key] = struct{}{}
allFiles = append(allFiles, f)
}
}
}
}
// 移到下一天
currentDate = currentDate.AddDate(0, 0, 1)
currentDate = time.Date(currentDate.Year(), currentDate.Month(), currentDate.Day(),
0, 0, 0, 0, currentDate.Location())
}
return allFiles, nil
}