feat(core): 添加视频连续性检查功能

- 实现了连续性检查循环,每5分钟执行一次检查
- 添加了跨天跨小时的文件列表获取功能
- 实现了单个设备和所有设备的连续性检查逻辑
- 添加了连续性检查结果上报API接口
- 实现了检查结果的数据结构定义和转换功能
- 配置了9点到18点的工作时间检查范围
- 添加了详细的日志记录和OpenTelemetry追踪支持
This commit is contained in:
2025-12-30 10:38:13 +08:00
parent 10e39a506c
commit 27dfda32fa
4 changed files with 554 additions and 0 deletions

188
api/continuity_report.go Normal file
View File

@@ -0,0 +1,188 @@
package api
import (
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)
const (
// 连续性上报接口地址
continuityReportURL = "https://zhentuai.com/api/device/video-continuity/report"
// 间隔阈值(毫秒)
maxAllowedGapMs = 2000
)
// ReportContinuityCheck 上报连续性检查结果
func ReportContinuityCheck(ctx context.Context, results []dto.ContinuityCheckResult) error {
_, span := tracer.Start(ctx, "ReportContinuityCheck")
defer span.End()
span.SetAttributes(attribute.Int("results.count", len(results)))
// 统计不连续的设备数量
discontinuousCount := 0
for _, r := range results {
if !r.IsContinuous {
discontinuousCount++
}
}
span.SetAttributes(attribute.Int("discontinuous.count", discontinuousCount))
// 逐个设备上报
var lastErr error
for _, result := range results {
if err := reportSingleDevice(ctx, result); err != nil {
logger.Error("上报设备连续性检查结果失败",
zap.String("deviceNo", result.DeviceNo),
zap.Error(err))
lastErr = err
}
}
return lastErr
}
// reportSingleDevice 上报单个设备的连续性检查结果
func reportSingleDevice(ctx context.Context, result dto.ContinuityCheckResult) error {
_, span := tracer.Start(ctx, "reportSingleDevice")
defer span.End()
span.SetAttributes(
attribute.String("device.no", result.DeviceNo),
attribute.Bool("continuous", result.IsContinuous),
)
// 转换为接口请求格式
request := convertToReportRequest(result)
// 序列化请求体
jsonData, err := json.Marshal(request)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "序列化JSON失败")
logger.Error("序列化连续性检查请求失败", zap.Error(err))
return err
}
span.SetAttributes(attribute.String("request.body", string(jsonData)))
// 创建请求
req, err := http.NewRequestWithContext(ctx, "POST", continuityReportURL, bytes.NewBuffer(jsonData))
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
logger.Error("创建连续性检查上报请求失败", zap.Error(err))
return err
}
req.Header.Set("Content-Type", "application/json")
span.SetAttributes(
attribute.String("http.url", continuityReportURL),
attribute.String("http.method", "POST"),
)
// 发送请求
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
logger.Error("发送连续性检查上报请求失败", zap.Error(err))
return err
}
defer resp.Body.Close()
span.SetAttributes(
attribute.String("http.status", resp.Status),
attribute.Int("http.status_code", resp.StatusCode),
)
// 读取响应体
body, err := io.ReadAll(resp.Body)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "读取响应体失败")
logger.Error("读取连续性检查上报响应失败", zap.Error(err))
return err
}
span.SetAttributes(attribute.String("response.body", string(body)))
// 解析响应
var response dto.ContinuityReportResponse
if err := json.Unmarshal(body, &response); err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "解析响应失败")
logger.Error("解析连续性检查上报响应失败",
zap.Error(err),
zap.String("body", string(body)))
return err
}
// 检查业务响应码
if response.Code != 0 {
errMsg := fmt.Sprintf("上报失败: code=%d, msg=%s", response.Code, response.Msg)
span.SetAttributes(attribute.String("error", errMsg))
span.SetStatus(codes.Error, "上报失败")
logger.Warn("连续性检查上报业务失败",
zap.String("deviceNo", result.DeviceNo),
zap.Int("code", response.Code),
zap.String("msg", response.Msg))
return fmt.Errorf(errMsg)
}
span.SetStatus(codes.Ok, "上报成功")
if result.IsContinuous {
logger.Debug("设备连续性检查上报成功",
zap.String("deviceNo", result.DeviceNo),
zap.Int("fileCount", result.FileCount))
} else {
logger.Info("设备连续性检查上报成功(不连续)",
zap.String("deviceNo", result.DeviceNo),
zap.Int("gapCount", len(result.Gaps)))
}
return nil
}
// convertToReportRequest 将内部检查结果转换为接口请求格式
func convertToReportRequest(result dto.ContinuityCheckResult) dto.ContinuityReportRequest {
// 时间格式:ISO 8601
const timeFormat = "2006-01-02T15:04:05"
request := dto.ContinuityReportRequest{
DeviceNo: result.DeviceNo,
StartTime: result.RangeStart.Format(timeFormat),
EndTime: result.RangeEnd.Format(timeFormat),
Support: true, // 始终支持
Continuous: result.IsContinuous,
TotalVideos: result.FileCount,
TotalDurationMs: result.TotalDurationMs,
MaxAllowedGapMs: maxAllowedGapMs,
}
// 转换间隙列表
if len(result.Gaps) > 0 {
request.Gaps = make([]dto.ContinuityReportGap, 0, len(result.Gaps))
for _, gap := range result.Gaps {
request.Gaps = append(request.Gaps, dto.ContinuityReportGap{
BeforeFileName: gap.PreviousFileName,
AfterFileName: gap.NextFileName,
GapMs: int64(gap.GapSeconds * 1000),
GapStartTime: gap.PreviousEndTime.Format(timeFormat),
GapEndTime: gap.NextStartTime.Format(timeFormat),
})
}
}
return request
}

306
core/continuity.go Normal file
View File

@@ -0,0 +1,306 @@
package core
import (
"ZhenTuLocalPassiveAdapter/api"
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/fs"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util"
"context"
"fmt"
"path"
"sort"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)
var continuityTracer = otel.Tracer("continuity")
const (
// 检查间隔:5分钟
checkInterval = 5 * time.Minute
// 检查起点:前12分钟
rangeStartMinutes = 12
// 检查终点:前2分钟
rangeEndMinutes = 2
// 间隔阈值:超过2秒视为断开
gapThresholdSeconds = 2.0
// 检查时间窗口:9点到18点
checkStartHour = 9
checkEndHour = 18
)
// RunContinuityCheckLoop 启动连续性检查循环
func RunContinuityCheckLoop(ctx context.Context) {
ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
logger.Info("连续性检查循环已启动",
zap.Duration("interval", checkInterval),
zap.Int("rangeStartMinutes", rangeStartMinutes),
zap.Int("rangeEndMinutes", rangeEndMinutes),
zap.Int("checkStartHour", checkStartHour),
zap.Int("checkEndHour", checkEndHour))
for {
select {
case <-ctx.Done():
logger.Info("连续性检查循环已停止")
return
case <-ticker.C:
now := time.Now()
// 只在9点到18点之间执行检查
currentHour := now.Hour()
if currentHour < checkStartHour || currentHour >= checkEndHour {
continue
}
startTime := now.Add(-time.Duration(rangeStartMinutes) * time.Minute)
endTime := now.Add(-time.Duration(rangeEndMinutes) * time.Minute)
results := CheckAllDevicesContinuity(ctx, startTime, endTime)
// 上报结果
if len(results) > 0 {
if err := api.ReportContinuityCheck(ctx, results); err != nil {
logger.Error("上报连续性检查结果失败", zap.Error(err))
}
}
}
}
}
// CheckAllDevicesContinuity 检查所有设备的连续性
func CheckAllDevicesContinuity(ctx context.Context, startTime, endTime time.Time) []dto.ContinuityCheckResult {
subCtx, span := continuityTracer.Start(ctx, "CheckAllDevicesContinuity")
defer span.End()
span.SetAttributes(
attribute.String("range.start", startTime.Format("2006-01-02 15:04:05")),
attribute.String("range.end", endTime.Format("2006-01-02 15:04:05")),
attribute.Int("devices.count", len(config.Config.Devices)),
)
var results []dto.ContinuityCheckResult
for _, device := range config.Config.Devices {
result := CheckDeviceContinuity(subCtx, device, startTime, endTime)
results = append(results, result)
if !result.IsContinuous {
logger.Warn("设备视频片段不连续",
zap.String("deviceNo", device.DeviceNo),
zap.String("deviceName", device.Name),
zap.Int("gapCount", len(result.Gaps)),
zap.String("errorMessage", result.ErrorMessage))
}
}
span.SetAttributes(attribute.Int("results.count", len(results)))
span.SetStatus(codes.Ok, "检查完成")
return results
}
// CheckDeviceContinuity 检查单个设备的连续性
func CheckDeviceContinuity(ctx context.Context, device config.DeviceMapping, startTime, endTime time.Time) dto.ContinuityCheckResult {
subCtx, span := continuityTracer.Start(ctx, "CheckDeviceContinuity")
defer span.End()
span.SetAttributes(
attribute.String("device.no", device.DeviceNo),
attribute.String("device.name", device.Name),
)
result := dto.ContinuityCheckResult{
DeviceNo: device.DeviceNo,
DeviceName: device.Name,
CheckTime: time.Now(),
RangeStart: startTime,
RangeEnd: endTime,
IsContinuous: true,
}
adapter := fs.GetAdapter()
// 获取文件列表(处理跨天跨小时)
fileList, err := getFileListForCheck(subCtx, adapter, device, startTime, endTime)
if err != nil {
span.SetStatus(codes.Error, "获取文件列表失败")
result.IsContinuous = false
result.ErrorMessage = fmt.Sprintf("获取文件列表失败: %v", err)
return result
}
// 过滤和排序
files := util.FilterAndSortFiles(subCtx, fileList, startTime, endTime)
result.FileCount = len(files)
if len(files) == 0 {
span.SetStatus(codes.Error, "时间范围内无文件")
result.IsContinuous = false
result.ErrorMessage = "时间范围内无视频文件"
return result
}
// 记录实际覆盖范围
coverageStart := files[0].StartTime
coverageEnd := files[len(files)-1].EndTime
result.CoverageStart = &coverageStart
result.CoverageEnd = &coverageEnd
// 按开始时间排序(FilterAndSortFiles已排序,但确保顺序正确)
sort.Slice(files, func(i, j int) bool {
return files[i].StartTime.Before(files[j].StartTime)
})
// 计算总时长(毫秒)
var totalDurationMs int64
for _, file := range files {
duration := file.EndTime.Sub(file.StartTime).Milliseconds()
if duration > 0 {
totalDurationMs += duration
}
}
result.TotalDurationMs = totalDurationMs
// 检查首个文件是否覆盖检查范围起点
if files[0].StartTime.After(startTime) {
gap := files[0].StartTime.Sub(startTime).Seconds()
if gap > gapThresholdSeconds {
result.IsContinuous = false
result.Gaps = append(result.Gaps, dto.GapInfo{
PreviousFileName: "(检查范围起点)",
PreviousEndTime: startTime,
NextFileName: files[0].Name,
NextStartTime: files[0].StartTime,
GapSeconds: gap,
})
}
}
// 检查文件间隔
if len(files) > 1 {
var lastFile *dto.File
for i := range files {
file := &files[i]
if lastFile == nil {
lastFile = file
continue
}
gap := file.StartTime.Sub(lastFile.EndTime).Seconds()
if gap > gapThresholdSeconds {
result.IsContinuous = false
result.Gaps = append(result.Gaps, dto.GapInfo{
PreviousFileName: lastFile.Name,
PreviousEndTime: lastFile.EndTime,
NextFileName: file.Name,
NextStartTime: file.StartTime,
GapSeconds: gap,
})
}
lastFile = file
}
}
// 检查末个文件是否覆盖检查范围终点
if files[len(files)-1].EndTime.Before(endTime) {
gap := endTime.Sub(files[len(files)-1].EndTime).Seconds()
if gap > gapThresholdSeconds {
result.IsContinuous = false
result.Gaps = append(result.Gaps, dto.GapInfo{
PreviousFileName: files[len(files)-1].Name,
PreviousEndTime: files[len(files)-1].EndTime,
NextFileName: "(检查范围终点)",
NextStartTime: endTime,
GapSeconds: gap,
})
}
}
if result.IsContinuous {
span.SetStatus(codes.Ok, "连续")
} else {
span.SetStatus(codes.Error, "不连续")
span.SetAttributes(attribute.Int("gaps.count", len(result.Gaps)))
}
return result
}
// getFileListForCheck 获取用于检查的文件列表(处理跨天、跨小时)
func getFileListForCheck(ctx context.Context, adapter fs.Adapter, device config.DeviceMapping, startTime, endTime time.Time) ([]dto.File, error) {
_, span := continuityTracer.Start(ctx, "getFileListForCheck")
defer span.End()
var allFiles []dto.File
seen := make(map[string]struct{})
storageType := config.Config.Record.Storage.Type
// 处理跨天情况:遍历从 startTime 到 endTime 涉及的每一天
currentDate := startTime
for !currentDate.After(endTime) {
dateDirPath := path.Join(device.Name,
currentDate.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02"))
// 确定当天需要查询的小时范围
dayStartHour := 0
dayEndHour := 23
if isSameDate(currentDate, startTime) {
dayStartHour = startTime.Hour()
}
if isSameDate(currentDate, endTime) {
dayEndHour = endTime.Hour()
}
// 对于 S3/OSS,按小时前缀查询更高效
if storageType == "s3" || storageType == "alioss" {
for hour := dayStartHour; hour <= dayEndHour; hour++ {
hourPrefix := fmt.Sprintf("%02d", hour)
dirPathWithHour := path.Join(dateDirPath, hourPrefix)
files, err := adapter.GetFileList(ctx, dirPathWithHour, currentDate)
if err != nil {
// 某个小时目录不存在时跳过,不视为错误
continue
}
for _, f := range files {
key := f.Path + "/" + f.Name
if _, ok := seen[key]; !ok {
seen[key] = struct{}{}
allFiles = append(allFiles, f)
}
}
}
} else {
// 本地存储直接按天目录获取
files, err := adapter.GetFileList(ctx, dateDirPath, currentDate)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
// 某天目录不存在时继续检查其他天
} else {
for _, f := range files {
key := f.Path + "/" + f.Name
if _, ok := seen[key]; !ok {
seen[key] = struct{}{}
allFiles = append(allFiles, f)
}
}
}
}
// 移到下一天
currentDate = currentDate.AddDate(0, 0, 1)
currentDate = time.Date(currentDate.Year(), currentDate.Month(), currentDate.Day(),
0, 0, 0, 0, currentDate.Location())
}
return allFiles, nil
}

57
dto/continuity.go Normal file
View File

@@ -0,0 +1,57 @@
package dto
import "time"
// ContinuityCheckResult 连续性检查结果(内部使用)
type ContinuityCheckResult struct {
DeviceNo string `json:"deviceNo"` // 设备编号
DeviceName string `json:"deviceName"` // 设备名称
CheckTime time.Time `json:"checkTime"` // 检查执行时间
RangeStart time.Time `json:"rangeStart"` // 检查范围起始
RangeEnd time.Time `json:"rangeEnd"` // 检查范围结束
IsContinuous bool `json:"isContinuous"` // 是否连续
Gaps []GapInfo `json:"gaps,omitempty"` // 间隔详情
CoverageStart *time.Time `json:"coverageStart,omitempty"` // 实际覆盖起始时间
CoverageEnd *time.Time `json:"coverageEnd,omitempty"` // 实际覆盖结束时间
FileCount int `json:"fileCount"` // 文件数量
TotalDurationMs int64 `json:"totalDurationMs"` // 总时长(毫秒)
ErrorMessage string `json:"errorMessage,omitempty"` // 错误信息
}
// GapInfo 间隔信息(内部使用)
type GapInfo struct {
PreviousFileName string `json:"previousFileName"` // 前一个文件名
PreviousEndTime time.Time `json:"previousEndTime"` // 前一个文件结束时间
NextFileName string `json:"nextFileName"` // 后一个文件名
NextStartTime time.Time `json:"nextStartTime"` // 后一个文件开始时间
GapSeconds float64 `json:"gapSeconds"` // 间隔秒数
}
// ContinuityReportRequest 连续性检查上报请求
type ContinuityReportRequest struct {
DeviceNo string `json:"deviceNo"` // 设备编号
StartTime string `json:"startTime"` // 检查开始时间 (ISO 8601)
EndTime string `json:"endTime"` // 检查结束时间 (ISO 8601)
Support bool `json:"support"` // 是否支持连续性检查
Continuous bool `json:"continuous"` // 视频是否连续
TotalVideos int `json:"totalVideos"` // 视频总数
TotalDurationMs int64 `json:"totalDurationMs"` // 总时长(毫秒)
MaxAllowedGapMs int64 `json:"maxAllowedGapMs,omitempty"` // 允许的最大间隙(毫秒)
Gaps []ContinuityReportGap `json:"gaps,omitempty"` // 间隙列表
}
// ContinuityReportGap 连续性检查上报间隙信息
type ContinuityReportGap struct {
BeforeFileName string `json:"beforeFileName"` // 前一个文件名
AfterFileName string `json:"afterFileName"` // 后一个文件名
GapMs int64 `json:"gapMs"` // 间隙时长(毫秒)
GapStartTime string `json:"gapStartTime"` // 间隙开始时间 (ISO 8601)
GapEndTime string `json:"gapEndTime"` // 间隙结束时间 (ISO 8601)
}
// ContinuityReportResponse 连续性检查上报响应
type ContinuityReportResponse struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data any `json:"data"`
}

View File

@@ -171,6 +171,9 @@ func main() {
// Start Task Loop
go runTaskLoop(ctx)
// Start Continuity Check Loop
go core.RunContinuityCheckLoop(ctx)
// Wait for signal
<-sigChan
logger.Info("Received shutdown signal")