You've already forked VptPassiveAdapter
333 lines
8.8 KiB
Go
333 lines
8.8 KiB
Go
package main
|
|
|
|
import (
|
|
"ZhenTuLocalPassiveAdapter/api"
|
|
"ZhenTuLocalPassiveAdapter/config"
|
|
"ZhenTuLocalPassiveAdapter/core"
|
|
"ZhenTuLocalPassiveAdapter/dto"
|
|
"ZhenTuLocalPassiveAdapter/logger"
|
|
"ZhenTuLocalPassiveAdapter/telemetry"
|
|
"ZhenTuLocalPassiveAdapter/util"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"runtime"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var tracer = otel.Tracer("vpt")
|
|
|
|
func startTask(device config.DeviceMapping, task dto.Task) {
|
|
ctx, span := tracer.Start(context.Background(), "startTask")
|
|
defer span.End()
|
|
span.SetAttributes(attribute.String("deviceNo", device.DeviceNo))
|
|
span.SetAttributes(attribute.String("taskId", task.TaskID))
|
|
span.SetAttributes(attribute.String("scenicId", task.ScenicID))
|
|
span.SetAttributes(attribute.String("startTime", task.StartTime.Format("2006-01-02 15:04:05")))
|
|
span.SetAttributes(attribute.String("endTime", task.EndTime.Format("2006-01-02 15:04:05")))
|
|
fo, err := core.HandleTask(ctx, device, task)
|
|
if err != nil {
|
|
span.SetStatus(codes.Error, "处理任务失败")
|
|
logger.Error("处理任务失败",
|
|
zap.String("taskID", task.TaskID),
|
|
zap.String("deviceNo", task.DeviceNo),
|
|
zap.Error(err))
|
|
api.ReportTaskFailure(ctx, task.TaskID)
|
|
return
|
|
}
|
|
span.SetAttributes(attribute.String("fileUrl", fo.URL))
|
|
logger.Info("处理任务成功",
|
|
zap.String("taskID", task.TaskID),
|
|
zap.String("deviceNo", task.DeviceNo))
|
|
|
|
// 复制主文件用于异步生成预览,避免阻塞主上传流程
|
|
copyFile := strings.TrimSuffix(fo.URL, ".mp4") + "_copy.mp4"
|
|
copyErr := copyFileOnDisk(fo.URL, copyFile)
|
|
if copyErr != nil {
|
|
logger.Warn("复制文件用于预览失败",
|
|
zap.String("taskID", task.TaskID),
|
|
zap.Error(copyErr))
|
|
}
|
|
|
|
err = api.UploadTaskFile(ctx, task, *fo)
|
|
if err != nil {
|
|
span.SetStatus(codes.Error, "上传文件失败")
|
|
logger.Error("上传文件失败",
|
|
zap.String("taskID", task.TaskID),
|
|
zap.String("deviceNo", task.DeviceNo),
|
|
zap.Error(err))
|
|
api.ReportTaskFailure(ctx, task.TaskID)
|
|
if copyErr == nil {
|
|
os.Remove(copyFile)
|
|
}
|
|
return
|
|
}
|
|
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
|
|
if !result {
|
|
span.SetStatus(codes.Error, "上报任务成功失败")
|
|
logger.Error("上报任务成功失败",
|
|
zap.String("taskID", task.TaskID),
|
|
zap.String("deviceNo", task.DeviceNo),
|
|
zap.Error(err))
|
|
if copyErr == nil {
|
|
os.Remove(copyFile)
|
|
}
|
|
return
|
|
}
|
|
span.SetStatus(codes.Ok, "上传文件成功")
|
|
logger.Info("上传文件成功",
|
|
zap.String("taskID", task.TaskID),
|
|
zap.String("deviceNo", task.DeviceNo))
|
|
|
|
// 异步:从副本压缩720p预览 → 上传 → 上报
|
|
if copyErr == nil {
|
|
go uploadPreview(task.TaskID, copyFile)
|
|
}
|
|
}
|
|
|
|
func uploadPreview(taskID string, copyFile string) {
|
|
ctx, span := tracer.Start(context.Background(), "uploadPreview")
|
|
defer span.End()
|
|
defer os.Remove(copyFile)
|
|
|
|
baseName := strings.TrimSuffix(copyFile, "_copy.mp4")
|
|
|
|
type previewSpec struct {
|
|
resolution string
|
|
height int
|
|
}
|
|
specs := []previewSpec{
|
|
{"720p", 720},
|
|
{"1080p", 1080},
|
|
}
|
|
|
|
for _, spec := range specs {
|
|
previewFile := fmt.Sprintf("%s_preview_%s.mp4", baseName, spec.resolution)
|
|
ok, _ := util.CompressVideo(ctx, copyFile, previewFile, spec.height)
|
|
if !ok {
|
|
logger.Error("生成预览文件失败",
|
|
zap.String("taskID", taskID),
|
|
zap.String("resolution", spec.resolution))
|
|
continue
|
|
}
|
|
|
|
err := api.UploadPreviewFile(ctx, taskID, previewFile, spec.resolution)
|
|
if err != nil {
|
|
logger.Error("上传预览文件失败",
|
|
zap.String("taskID", taskID),
|
|
zap.String("resolution", spec.resolution),
|
|
zap.Error(err))
|
|
os.Remove(previewFile)
|
|
continue
|
|
}
|
|
if !api.ReportPreviewSuccess(ctx, taskID, spec.resolution) {
|
|
logger.Error("上报预览成功失败",
|
|
zap.String("taskID", taskID),
|
|
zap.String("resolution", spec.resolution))
|
|
continue
|
|
}
|
|
logger.Info("预览上传成功",
|
|
zap.String("taskID", taskID),
|
|
zap.String("resolution", spec.resolution))
|
|
}
|
|
|
|
span.SetStatus(codes.Ok, "预览处理完成")
|
|
}
|
|
|
|
func copyFileOnDisk(src, dst string) error {
|
|
in, err := os.Open(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer in.Close()
|
|
out, err := os.Create(dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer out.Close()
|
|
_, err = io.Copy(out, in)
|
|
return err
|
|
}
|
|
|
|
func executeDisconnectCommand(command string) {
|
|
go func() {
|
|
var cmd *exec.Cmd
|
|
if runtime.GOOS == "windows" {
|
|
cmd = exec.Command("cmd", "/C", command)
|
|
} else {
|
|
cmd = exec.Command("sh", "-c", command)
|
|
}
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
logger.Error("断连命令执行失败",
|
|
zap.String("command", command),
|
|
zap.String("output", string(output)),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
logger.Info("断连命令执行成功",
|
|
zap.String("command", command),
|
|
zap.String("output", string(output)))
|
|
}()
|
|
}
|
|
|
|
func runTaskLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(2 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
var firstFailureTime time.Time
|
|
commandExecuted := false
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// 执行任务
|
|
tasks, err := api.SyncTask()
|
|
if err != nil {
|
|
var connErr *api.ConnError
|
|
if errors.As(err, &connErr) {
|
|
if config.Config.DisconnectAction.Enabled {
|
|
// 连接级错误:开始/继续计时
|
|
if firstFailureTime.IsZero() {
|
|
firstFailureTime = time.Now()
|
|
logger.Warn("服务器连接失败,开始计时",
|
|
zap.Error(err))
|
|
}
|
|
threshold := time.Duration(config.Config.DisconnectAction.ThresholdMinutes) * time.Minute
|
|
if !commandExecuted && time.Since(firstFailureTime) >= threshold && config.Config.DisconnectAction.Command != "" {
|
|
logger.Warn("服务器持续不可达,执行断连命令",
|
|
zap.Duration("duration", time.Since(firstFailureTime)),
|
|
zap.String("command", config.Config.DisconnectAction.Command))
|
|
executeDisconnectCommand(config.Config.DisconnectAction.Command)
|
|
commandExecuted = true
|
|
}
|
|
}
|
|
} else {
|
|
// 非连接错误(服务器可达,但业务/解析出错):重置断连状态
|
|
if !firstFailureTime.IsZero() {
|
|
logger.Info("服务器连接已恢复",
|
|
zap.Duration("disconnectDuration", time.Since(firstFailureTime)))
|
|
firstFailureTime = time.Time{}
|
|
commandExecuted = false
|
|
}
|
|
logger.Error("同步任务失败", zap.Error(err))
|
|
}
|
|
} else {
|
|
// 服务器可达:重置状态
|
|
if !firstFailureTime.IsZero() {
|
|
logger.Info("服务器连接已恢复",
|
|
zap.Duration("disconnectDuration", time.Since(firstFailureTime)))
|
|
firstFailureTime = time.Time{}
|
|
commandExecuted = false
|
|
}
|
|
for _, task := range tasks {
|
|
logger.Info("开始处理任务",
|
|
zap.String("taskID", task.TaskID),
|
|
zap.String("deviceNo", task.DeviceNo),
|
|
zap.String("startTime", task.StartTime.Format("2006-01-02 15:04:05")),
|
|
zap.String("endTime", task.EndTime.Format("2006-01-02 15:04:05")))
|
|
// 处理任务
|
|
for _, device := range config.Config.Devices {
|
|
if device.DeviceNo == task.DeviceNo {
|
|
// 处理任务
|
|
go startTask(device, task)
|
|
break // 提前返回,避免不必要的循环
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func startViidServer() {
|
|
if !config.Config.Viid.Enabled {
|
|
return
|
|
}
|
|
|
|
gin.SetMode(gin.ReleaseMode)
|
|
r := gin.Default()
|
|
|
|
// Register Routes
|
|
api.RegisterVIIDRoutes(r)
|
|
|
|
addr := fmt.Sprintf(":%d", config.Config.Viid.Port)
|
|
logger.Info("VIID Server starting", zap.String("addr", addr))
|
|
go func() {
|
|
if err := r.Run(addr); err != nil {
|
|
logger.Error("VIID Server failed", zap.Error(err))
|
|
}
|
|
}()
|
|
}
|
|
|
|
func main() {
|
|
// 初始化日志
|
|
err := logger.Init()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer logger.Sync()
|
|
|
|
err = config.LoadConfig()
|
|
if err != nil {
|
|
logger.Fatal("加载配置文件失败", zap.Error(err))
|
|
return
|
|
}
|
|
ctx := context.Background()
|
|
shutdown, err := telemetry.InitTelemetry(ctx)
|
|
if err != nil {
|
|
logger.Fatal("Failed to initialize telemetry", zap.Error(err))
|
|
return
|
|
}
|
|
defer shutdown(ctx)
|
|
|
|
if config.Config.Record.Storage.Type == "local" {
|
|
_, err = os.Stat(config.Config.Record.Storage.Path)
|
|
if err != nil {
|
|
logger.Error("录像文件夹配置失败", zap.Error(err))
|
|
return
|
|
} else {
|
|
logger.Info("录像文件夹配置有效")
|
|
}
|
|
} else {
|
|
logger.Info("录像文件夹配置为OSS")
|
|
}
|
|
|
|
// Start VIID Server
|
|
startViidServer()
|
|
|
|
// Context for graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Handle Signals
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Start Task Loop
|
|
go runTaskLoop(ctx)
|
|
|
|
// Start Continuity Check Loop
|
|
go core.RunContinuityCheckLoop(ctx)
|
|
|
|
// Wait for signal
|
|
<-sigChan
|
|
logger.Info("Received shutdown signal")
|
|
cancel()
|
|
logger.Info("Shutdown complete")
|
|
}
|