package main import ( "ZhenTuLocalPassiveAdapter/api" "ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/core" "ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/telemetry" "context" "errors" "fmt" "os" "os/exec" "os/signal" "runtime" "syscall" "time" "github.com/gin-gonic/gin" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.uber.org/zap" ) var tracer = otel.Tracer("vpt") func startTask(device config.DeviceMapping, task dto.Task) { ctx, span := tracer.Start(context.Background(), "startTask") defer span.End() span.SetAttributes(attribute.String("deviceNo", device.DeviceNo)) span.SetAttributes(attribute.String("taskId", task.TaskID)) span.SetAttributes(attribute.String("scenicId", task.ScenicID)) span.SetAttributes(attribute.String("startTime", task.StartTime.Format("2006-01-02 15:04:05"))) span.SetAttributes(attribute.String("endTime", task.EndTime.Format("2006-01-02 15:04:05"))) fo, err := core.HandleTask(ctx, device, task) if err != nil { span.SetStatus(codes.Error, "处理任务失败") logger.Error("处理任务失败", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo), zap.Error(err)) api.ReportTaskFailure(ctx, task.TaskID) return } span.SetAttributes(attribute.String("fileUrl", fo.URL)) logger.Info("处理任务成功", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo)) err = api.UploadTaskFile(ctx, task, *fo) if err != nil { span.SetStatus(codes.Error, "上传文件失败") logger.Error("上传文件失败", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo), zap.Error(err)) api.ReportTaskFailure(ctx, task.TaskID) return } result := api.ReportTaskSuccess(ctx, task.TaskID, fo) if !result { span.SetStatus(codes.Error, "上报任务成功失败") logger.Error("上报任务成功失败", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo), zap.Error(err)) return } span.SetStatus(codes.Ok, "上传文件成功") logger.Info("上传文件成功", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo)) } func executeDisconnectCommand(command string) { go func() { var cmd *exec.Cmd if runtime.GOOS == "windows" { cmd = exec.Command("cmd", "/C", command) } else { cmd = exec.Command("sh", "-c", command) } output, err := cmd.CombinedOutput() if err != nil { logger.Error("断连命令执行失败", zap.String("command", command), zap.String("output", string(output)), zap.Error(err)) return } logger.Info("断连命令执行成功", zap.String("command", command), zap.String("output", string(output))) }() } func runTaskLoop(ctx context.Context) { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() var firstFailureTime time.Time commandExecuted := false for { select { case <-ctx.Done(): return case <-ticker.C: // 执行任务 tasks, err := api.SyncTask() if err != nil { var connErr *api.ConnError if errors.As(err, &connErr) { if config.Config.DisconnectAction.Enabled { // 连接级错误:开始/继续计时 if firstFailureTime.IsZero() { firstFailureTime = time.Now() logger.Warn("服务器连接失败,开始计时", zap.Error(err)) } threshold := time.Duration(config.Config.DisconnectAction.ThresholdMinutes) * time.Minute if !commandExecuted && time.Since(firstFailureTime) >= threshold && config.Config.DisconnectAction.Command != "" { logger.Warn("服务器持续不可达,执行断连命令", zap.Duration("duration", time.Since(firstFailureTime)), zap.String("command", config.Config.DisconnectAction.Command)) executeDisconnectCommand(config.Config.DisconnectAction.Command) commandExecuted = true } } } else { // 非连接错误(服务器可达,但业务/解析出错):重置断连状态 if !firstFailureTime.IsZero() { logger.Info("服务器连接已恢复", zap.Duration("disconnectDuration", time.Since(firstFailureTime))) firstFailureTime = time.Time{} commandExecuted = false } logger.Error("同步任务失败", zap.Error(err)) } } else { // 服务器可达:重置状态 if !firstFailureTime.IsZero() { logger.Info("服务器连接已恢复", zap.Duration("disconnectDuration", time.Since(firstFailureTime))) firstFailureTime = time.Time{} commandExecuted = false } for _, task := range tasks { logger.Info("开始处理任务", zap.String("taskID", task.TaskID), zap.String("deviceNo", task.DeviceNo), zap.String("startTime", task.StartTime.Format("2006-01-02 15:04:05")), zap.String("endTime", task.EndTime.Format("2006-01-02 15:04:05"))) // 处理任务 for _, device := range config.Config.Devices { if device.DeviceNo == task.DeviceNo { // 处理任务 go startTask(device, task) break // 提前返回,避免不必要的循环 } } } } } } } func startViidServer() { if !config.Config.Viid.Enabled { return } gin.SetMode(gin.ReleaseMode) r := gin.Default() // Register Routes api.RegisterVIIDRoutes(r) addr := fmt.Sprintf(":%d", config.Config.Viid.Port) logger.Info("VIID Server starting", zap.String("addr", addr)) go func() { if err := r.Run(addr); err != nil { logger.Error("VIID Server failed", zap.Error(err)) } }() } func main() { // 初始化日志 err := logger.Init() if err != nil { panic(err) } defer logger.Sync() err = config.LoadConfig() if err != nil { logger.Fatal("加载配置文件失败", zap.Error(err)) return } ctx := context.Background() shutdown, err := telemetry.InitTelemetry(ctx) if err != nil { logger.Fatal("Failed to initialize telemetry", zap.Error(err)) return } defer shutdown(ctx) if config.Config.Record.Storage.Type == "local" { _, err = os.Stat(config.Config.Record.Storage.Path) if err != nil { logger.Error("录像文件夹配置失败", zap.Error(err)) return } else { logger.Info("录像文件夹配置有效") } } else { logger.Info("录像文件夹配置为OSS") } // Start VIID Server startViidServer() // Context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Handle Signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Start Task Loop go runTaskLoop(ctx) // Start Continuity Check Loop go core.RunContinuityCheckLoop(ctx) // Wait for signal <-sigChan logger.Info("Received shutdown signal") cancel() logger.Info("Shutdown complete") }