From 5722dd8e5afbb03af7f4c5d7182cd70f147efda4 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Mon, 9 Feb 2026 23:33:55 +0800 Subject: [PATCH] =?UTF-8?q?feat(core):=20=E6=B7=BB=E5=8A=A0=E6=96=AD?= =?UTF-8?q?=E8=BF=9E=E6=A3=80=E6=B5=8B=E5=92=8C=E5=91=BD=E4=BB=A4=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ConnError 类型用于区分连接级错误和应用层错误 - 在 sync_task 中将网络请求错误包装为 ConnError - 添加 DisconnectActionConfig 配置结构支持断连操作 - 在配置文件中增加 disconnectAction 配置项 - 实现 executeDisconnectCommand 函数支持跨平台命令执行 - 在主循环中添加断连检测逻辑和阈值判断 - 支持服务器连接恢复时重置断连状态 - 添加详细的日志记录用于断连状态追踪 --- api/sync_task.go | 10 ++++++- config.yaml | 4 +++ config/dto.go | 19 +++++++++----- main.go | 68 +++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 91 insertions(+), 10 deletions(-) diff --git a/api/sync_task.go b/api/sync_task.go index 1c4a2be..a122155 100644 --- a/api/sync_task.go +++ b/api/sync_task.go @@ -12,6 +12,14 @@ import ( "net/http" ) +// ConnError 表示与服务器的连接级错误(非 HTTP 应用层错误) +type ConnError struct { + Err error +} + +func (e *ConnError) Error() string { return e.Err.Error() } +func (e *ConnError) Unwrap() error { return e.Err } + func SyncTask() ([]dto.Task, error) { url := config.Config.Api.BaseUrl + "/sync" requestBody := map[string]interface{}{ @@ -32,7 +40,7 @@ func SyncTask() ([]dto.Task, error) { resp, err := GetAPIClient().Do(req) if err != nil { logger.Error("发送请求失败", zap.Error(err)) - return nil, err + return nil, &ConnError{Err: err} } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) diff --git a/config.yaml b/config.yaml index c41fb2f..2d53042 100644 --- a/config.yaml +++ b/config.yaml @@ -28,3 +28,7 @@ viid: serverUrl: "http://127.0.0.1:18083" scenicId: 3975985126059413504 port: 8080 +disconnectAction: + enabled: false + thresholdMinutes: 5 + command: "" diff --git a/config/dto.go b/config/dto.go index 90bc436..f50a986 100644 --- a/config/dto.go +++ b/config/dto.go @@ -58,10 +58,17 @@ type ViidConfig struct { Port int `mapstructure:"port"` } -type MainConfig struct { - Api ApiConfig `mapstructure:"api"` - Record RecordConfig `mapstructure:"record"` - Devices []DeviceMapping `mapstructure:"devices"` - FileName FileNameConfig `mapstructure:"fileName"` - Viid ViidConfig `mapstructure:"viid"` +type DisconnectActionConfig struct { + Enabled bool `mapstructure:"enabled"` + ThresholdMinutes int `mapstructure:"thresholdMinutes"` + Command string `mapstructure:"command"` +} + +type MainConfig struct { + Api ApiConfig `mapstructure:"api"` + Record RecordConfig `mapstructure:"record"` + Devices []DeviceMapping `mapstructure:"devices"` + FileName FileNameConfig `mapstructure:"fileName"` + Viid ViidConfig `mapstructure:"viid"` + DisconnectAction DisconnectActionConfig `mapstructure:"disconnectAction"` } diff --git a/main.go b/main.go index cdeaf17..f87a210 100644 --- a/main.go +++ b/main.go @@ -8,9 +8,12 @@ import ( "ZhenTuLocalPassiveAdapter/logger" "ZhenTuLocalPassiveAdapter/telemetry" "context" + "errors" "fmt" "os" + "os/exec" "os/signal" + "runtime" "syscall" "time" @@ -70,10 +73,35 @@ func startTask(device config.DeviceMapping, task dto.Task) { zap.String("deviceNo", task.DeviceNo)) } +func executeDisconnectCommand(command string) { + go func() { + var cmd *exec.Cmd + if runtime.GOOS == "windows" { + cmd = exec.Command("cmd", "/C", command) + } else { + cmd = exec.Command("sh", "-c", command) + } + output, err := cmd.CombinedOutput() + if err != nil { + logger.Error("断连命令执行失败", + zap.String("command", command), + zap.String("output", string(output)), + zap.Error(err)) + return + } + logger.Info("断连命令执行成功", + zap.String("command", command), + zap.String("output", string(output))) + }() +} + func runTaskLoop(ctx context.Context) { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() + var firstFailureTime time.Time + commandExecuted := false + for { select { case <-ctx.Done(): @@ -81,7 +109,43 @@ func runTaskLoop(ctx context.Context) { case <-ticker.C: // 执行任务 tasks, err := api.SyncTask() - if err == nil { + if err != nil { + var connErr *api.ConnError + if errors.As(err, &connErr) { + if config.Config.DisconnectAction.Enabled { + // 连接级错误:开始/继续计时 + if firstFailureTime.IsZero() { + firstFailureTime = time.Now() + logger.Warn("服务器连接失败,开始计时", + zap.Error(err)) + } + threshold := time.Duration(config.Config.DisconnectAction.ThresholdMinutes) * time.Minute + if !commandExecuted && time.Since(firstFailureTime) >= threshold && config.Config.DisconnectAction.Command != "" { + logger.Warn("服务器持续不可达,执行断连命令", + zap.Duration("duration", time.Since(firstFailureTime)), + zap.String("command", config.Config.DisconnectAction.Command)) + executeDisconnectCommand(config.Config.DisconnectAction.Command) + commandExecuted = true + } + } + } else { + // 非连接错误(服务器可达,但业务/解析出错):重置断连状态 + if !firstFailureTime.IsZero() { + logger.Info("服务器连接已恢复", + zap.Duration("disconnectDuration", time.Since(firstFailureTime))) + firstFailureTime = time.Time{} + commandExecuted = false + } + logger.Error("同步任务失败", zap.Error(err)) + } + } else { + // 服务器可达:重置状态 + if !firstFailureTime.IsZero() { + logger.Info("服务器连接已恢复", + zap.Duration("disconnectDuration", time.Since(firstFailureTime))) + firstFailureTime = time.Time{} + commandExecuted = false + } for _, task := range tasks { logger.Info("开始处理任务", zap.String("taskID", task.TaskID), @@ -97,8 +161,6 @@ func runTaskLoop(ctx context.Context) { } } } - } else { - logger.Error("同步任务失败", zap.Error(err)) } } }