Compare commits

..

3 Commits

Author SHA1 Message Date
4b1eb11986 日志 2025-08-04 10:49:24 +08:00
84ccaa56de 优化http连接池 2025-08-03 14:20:17 +08:00
838430ee2f 30秒缓存 2025-06-03 09:49:42 +08:00
12 changed files with 240 additions and 86 deletions

40
api/http_client.go Normal file
View File

@@ -0,0 +1,40 @@
package api
import (
"net/http"
"time"
)
var (
// 通用HTTP客户端,带短超时时间,适用于API调用
apiClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
Timeout: 5 * time.Second,
}
// 文件上传专用HTTP客户端,超时时间较长
uploadClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 50,
MaxIdleConnsPerHost: 5,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
Timeout: 60 * time.Second, // 文件上传需要更长的超时时间
}
)
// GetAPIClient 获取API调用客户端
func GetAPIClient() *http.Client {
return apiClient
}
// GetUploadClient 获取文件上传客户端
func GetUploadClient() *http.Client {
return uploadClient
}

View File

@@ -2,13 +2,14 @@ package api
import ( import (
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"io" "io"
"log"
"net/http" "net/http"
"os" "os"
) )
@@ -20,7 +21,7 @@ func UploadTaskFile(ctx context.Context, task dto.Task, file dto.FileObject) err
if err != nil { if err != nil {
return err return err
} }
log.Printf("开始上传文件, URL:【%s】\n", url) logger.Info("开始上传文件", zap.String("url", url))
if err := OssUpload(subCtx, url, file.URL); err != nil { if err := OssUpload(subCtx, url, file.URL); err != nil {
return err return err
} }
@@ -58,8 +59,7 @@ func OssUpload(ctx context.Context, url, filePath string) error {
} }
req.Header.Set("Content-Type", "video/mp4") req.Header.Set("Content-Type", "video/mp4")
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(fileBytes))) req.Header.Set("Content-Length", fmt.Sprintf("%d", len(fileBytes)))
client := &http.Client{} resp, err := GetUploadClient().Do(req)
resp, err := client.Do(req)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败") span.SetStatus(codes.Error, "发送请求失败")

View File

@@ -3,13 +3,13 @@ package api
import ( import (
"ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"go.uber.org/zap"
"io" "io"
"log"
"net/http" "net/http"
"time"
) )
func SyncTask() ([]dto.Task, error) { func SyncTask() ([]dto.Task, error) {
@@ -20,27 +20,24 @@ func SyncTask() ([]dto.Task, error) {
} }
jsonData, err := json.Marshal(requestBody) jsonData, err := json.Marshal(requestBody)
if err != nil { if err != nil {
log.Println("Error marshaling JSON:", err) logger.Error("序列化JSON失败", zap.Error(err))
return nil, err return nil, err
} }
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil { if err != nil {
log.Println("Error creating request:", err) logger.Error("创建请求失败", zap.Error(err))
return nil, err return nil, err
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
client := &http.Client{ resp, err := GetAPIClient().Do(req)
Timeout: 5 * time.Second, // 设置超时时间为5秒
}
resp, err := client.Do(req)
if err != nil { if err != nil {
log.Println("Error sending request:", err) logger.Error("发送请求失败", zap.Error(err))
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
log.Println("Error reading response body:", err) logger.Error("读取响应体失败", zap.Error(err))
return nil, err return nil, err
} }
@@ -48,12 +45,13 @@ func SyncTask() ([]dto.Task, error) {
var response dto.TaskListResponse var response dto.TaskListResponse
err = json.Unmarshal(body, &response) err = json.Unmarshal(body, &response)
if err != nil { if err != nil {
log.Println("->:", string(body)) logger.Error("解析响应体失败",
log.Println("Error unmarshaling response body:", err) zap.String("responseBody", string(body)),
zap.Error(err))
return nil, err return nil, err
} }
if response.Code != 200 { if response.Code != 200 {
log.Println("Error response code:", response.Code) logger.Error("响应错误码", zap.Int("code", response.Code))
return nil, fmt.Errorf(response.Msg) return nil, fmt.Errorf(response.Msg)
} }
return response.Data, nil return response.Data, nil

View File

@@ -3,13 +3,14 @@ package api
import ( import (
"ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"io" "io"
"log"
"net/http" "net/http"
) )
@@ -23,15 +24,14 @@ func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) {
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败") span.SetStatus(codes.Error, "创建请求失败")
log.Println("Error creating request:", err) logger.Error("创建请求失败", zap.Error(err))
return "", err return "", err
} }
client := &http.Client{} resp, err := GetAPIClient().Do(req)
resp, err := client.Do(req)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败") span.SetStatus(codes.Error, "发送请求失败")
log.Println("Error sending request:", err) logger.Error("发送请求失败", zap.Error(err))
return "", err return "", err
} }
span.SetAttributes(attribute.String("http.status", resp.Status)) span.SetAttributes(attribute.String("http.status", resp.Status))
@@ -41,7 +41,7 @@ func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) {
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "读取响应体失败") span.SetStatus(codes.Error, "读取响应体失败")
log.Println("Error reading response body:", err) logger.Error("读取响应体失败", zap.Error(err))
return "", err return "", err
} }
return string(body), nil return string(body), nil
@@ -58,16 +58,15 @@ func ReportTaskFailure(ctx context.Context, taskId string) bool {
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败") span.SetStatus(codes.Error, "创建请求失败")
log.Println("Error creating request:", err) logger.Error("创建请求失败", zap.Error(err))
return false return false
} }
client := &http.Client{} resp, err := GetAPIClient().Do(req)
resp, err := client.Do(req)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败") span.SetStatus(codes.Error, "发送请求失败")
log.Println("Error sending request:", err) logger.Error("发送请求失败", zap.Error(err))
return false return false
} }
defer resp.Body.Close() defer resp.Body.Close()
@@ -94,7 +93,7 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "序列化JSON失败") span.SetStatus(codes.Error, "序列化JSON失败")
log.Println("Error marshaling JSON:", err) logger.Error("序列化JSON失败", zap.Error(err))
return false return false
} }
@@ -102,17 +101,16 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败") span.SetStatus(codes.Error, "创建请求失败")
log.Println("Error creating request:", err) logger.Error("创建请求失败", zap.Error(err))
return false return false
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
client := &http.Client{} resp, err := GetAPIClient().Do(req)
resp, err := client.Do(req)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败") span.SetStatus(codes.Error, "发送请求失败")
log.Println("Error sending request:", err) logger.Error("发送请求失败", zap.Error(err))
return false return false
} }
defer resp.Body.Close() defer resp.Body.Close()

View File

@@ -1,8 +1,9 @@
package config package config
import ( import (
"ZhenTuLocalPassiveAdapter/logger"
"github.com/spf13/viper" "github.com/spf13/viper"
"log" "go.uber.org/zap"
) )
var Config MainConfig var Config MainConfig
@@ -14,13 +15,13 @@ func LoadConfig() error {
// 读取配置文件 // 读取配置文件
if err := viper.ReadInConfig(); err != nil { if err := viper.ReadInConfig(); err != nil {
log.Fatalf("Error reading config file, %s", err) logger.Fatal("读取配置文件失败", zap.Error(err))
return err return err
} }
// 反序列化配置到结构体 // 反序列化配置到结构体
if err := viper.Unmarshal(&Config); err != nil { if err := viper.Unmarshal(&Config); err != nil {
log.Fatalf("Unable to decode into struct, %v", err) logger.Fatal("解析配置失败", zap.Error(err))
return err return err
} }
return nil return nil

View File

@@ -4,13 +4,14 @@ import (
"ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/fs" "ZhenTuLocalPassiveAdapter/fs"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util" "ZhenTuLocalPassiveAdapter/util"
"context" "context"
"fmt" "fmt"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"log" "go.uber.org/zap"
"path" "path"
) )
@@ -32,7 +33,9 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "获取文件列表失败") span.SetStatus(codes.Error, "获取文件列表失败")
log.Printf("获取文件列表失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err) logger.Error("获取文件列表失败",
zap.String("deviceNo", device.DeviceNo),
zap.Error(err))
return nil, err return nil, err
} }
files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime) files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime)
@@ -45,7 +48,9 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件片段检查失败") span.SetStatus(codes.Error, "文件片段检查失败")
log.Printf("文件片段检查失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err) logger.Error("文件片段检查失败",
zap.String("deviceNo", device.DeviceNo),
zap.Error(err))
return nil, err return nil, err
} }
ok := util.RunFfmpegTask(subCtx, constructTask) ok := util.RunFfmpegTask(subCtx, constructTask)

View File

@@ -3,11 +3,12 @@ package fs
import ( import (
"ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util" "ZhenTuLocalPassiveAdapter/util"
"context" "context"
"fmt" "fmt"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
"log" "go.uber.org/zap"
"path" "path"
"sort" "sort"
"sync" "sync"
@@ -63,9 +64,9 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02")) cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
if cachedInterface, ok := s3Cache.Load(cacheKey); ok { if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem) cachedItem := cachedInterface.(cacheItem)
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now())) logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
if time.Now().Before(cachedItem.expires) { if time.Now().Before(cachedItem.expires) {
log.Println("获取已缓存列表", cacheKey) logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey))
span.SetAttributes(attribute.Bool("cache.hit", true)) span.SetAttributes(attribute.Bool("cache.hit", true))
return cachedItem.data, nil return cachedItem.data, nil
} }
@@ -83,9 +84,9 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
if cachedInterface, ok := s3Cache.Load(cacheKey); ok { if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem) cachedItem := cachedInterface.(cacheItem)
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now())) logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
if time.Now().Before(cachedItem.expires) { if time.Now().Before(cachedItem.expires) {
log.Println("过锁后获取已缓存列表", cacheKey) logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey))
span.SetAttributes(attribute.Bool("s3Cache.hit", true)) span.SetAttributes(attribute.Bool("s3Cache.hit", true))
return cachedItem.data, nil return cachedItem.data, nil
} }
@@ -144,7 +145,7 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "生成预签名URL失败") span.SetStatus(codes.Error, "生成预签名URL失败")
log.Println("Error presigning GetObject request:", err) logger.Error("生成预签名URL失败", zap.Error(err))
continue continue
} }
fileList = append(fileList, dto.File{ fileList = append(fileList, dto.File{
@@ -171,10 +172,10 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
cacheItem := cacheItem{ cacheItem := cacheItem{
data: fileList, data: fileList,
expires: time.Now().Add(10 * time.Second), expires: time.Now().Add(30 * time.Second),
} }
s3Cache.Store(cacheKey, cacheItem) s3Cache.Store(cacheKey, cacheItem)
log.Println("缓存文件列表", cacheKey) logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey))
return fileList, nil return fileList, nil
} }

2
go.mod
View File

@@ -47,6 +47,7 @@ require (
go.opentelemetry.io/otel/trace v1.35.0 // indirect go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.35.0 // indirect golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.31.0 // indirect golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect golang.org/x/text v0.23.0 // indirect
@@ -54,5 +55,6 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/grpc v1.71.0 // indirect google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

4
go.sum
View File

@@ -95,6 +95,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
@@ -112,5 +114,7 @@ google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

74
logger/logger.go Normal file
View File

@@ -0,0 +1,74 @@
package logger
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
var Logger *zap.Logger
func Init() error {
config := zap.NewProductionConfig()
config.OutputPaths = []string{"stdout"}
config.ErrorOutputPaths = []string{"stderr"}
// 配置日志轮换
lumberJackLogger := &lumberjack.Logger{
Filename: "logs/app.log",
MaxSize: 10, // MB
MaxBackups: 3,
MaxAge: 30, // days
Compress: true,
}
// 创建写入器
w := zapcore.AddSync(lumberJackLogger)
// 设置日志级别
level := zap.NewAtomicLevelAt(zap.InfoLevel)
// 创建编码器配置
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.TimeKey = "time"
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.MessageKey = "msg"
encoderConfig.CallerKey = "caller"
// 创建控制台编码器(文本格式)
encoder := zapcore.NewConsoleEncoder(encoderConfig)
// 创建核心
core := zapcore.NewCore(encoder, w, level)
// 创建日志记录器
Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
return nil
}
func Info(message string, fields ...zap.Field) {
Logger.Info(message, fields...)
}
func Error(message string, fields ...zap.Field) {
Logger.Error(message, fields...)
}
func Warn(message string, fields ...zap.Field) {
Logger.Warn(message, fields...)
}
func Debug(message string, fields ...zap.Field) {
Logger.Debug(message, fields...)
}
func Fatal(message string, fields ...zap.Field) {
Logger.Fatal(message, fields...)
}
func Sync() error {
return Logger.Sync()
}

64
main.go
View File

@@ -5,12 +5,13 @@ import (
"ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/core" "ZhenTuLocalPassiveAdapter/core"
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/telemetry" "ZhenTuLocalPassiveAdapter/telemetry"
"context" "context"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"log" "go.uber.org/zap"
"os" "os"
"time" "time"
) )
@@ -28,63 +29,72 @@ func startTask(device config.DeviceMapping, task dto.Task) {
fo, err := core.HandleTask(ctx, device, task) fo, err := core.HandleTask(ctx, device, task)
if err != nil { if err != nil {
span.SetStatus(codes.Error, "处理任务失败") span.SetStatus(codes.Error, "处理任务失败")
log.Printf("处理任务失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) logger.Error("处理任务失败",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID) api.ReportTaskFailure(ctx, task.TaskID)
return return
} }
span.SetAttributes(attribute.String("fileUrl", fo.URL)) span.SetAttributes(attribute.String("fileUrl", fo.URL))
log.Printf("处理任务成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo) logger.Info("处理任务成功",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo))
err = api.UploadTaskFile(ctx, task, *fo) err = api.UploadTaskFile(ctx, task, *fo)
if err != nil { if err != nil {
span.SetStatus(codes.Error, "上传文件失败") span.SetStatus(codes.Error, "上传文件失败")
log.Printf("上传文件失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) logger.Error("上传文件失败",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID) api.ReportTaskFailure(ctx, task.TaskID)
return return
} }
result := api.ReportTaskSuccess(ctx, task.TaskID, fo) result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
if !result { if !result {
span.SetStatus(codes.Error, "上报任务成功失败") span.SetStatus(codes.Error, "上报任务成功失败")
log.Printf("上报任务成功失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) logger.Error("上报任务成功失败",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
return return
} }
span.SetStatus(codes.Ok, "上传文件成功") span.SetStatus(codes.Ok, "上传文件成功")
log.Printf("上传文件成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo) logger.Info("上传文件成功",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo))
} }
func main() { func main() {
err := config.LoadConfig() // 初始化日志
err := logger.Init()
if err != nil { if err != nil {
log.Println("加载配置文件失败:", err) panic(err)
}
defer logger.Sync()
err = config.LoadConfig()
if err != nil {
logger.Fatal("加载配置文件失败", zap.Error(err))
return return
} }
// 日志文件路径
logFilePath := "app.log"
ctx := context.Background() ctx := context.Background()
shutdown, err := telemetry.InitTelemetry(ctx) shutdown, err := telemetry.InitTelemetry(ctx)
if err != nil { if err != nil {
log.Fatalf("Failed to initialize telemetry: %v", err) logger.Fatal("Failed to initialize telemetry", zap.Error(err))
return return
} }
defer shutdown(ctx) defer shutdown(ctx)
// 创建或打开日志文件
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logFile.Close()
// 设置日志输出到文件
log.SetOutput(logFile)
if config.Config.Record.Storage.Type == "local" { if config.Config.Record.Storage.Type == "local" {
_, err = os.Stat(config.Config.Record.Storage.Path) _, err = os.Stat(config.Config.Record.Storage.Path)
if err != nil { if err != nil {
log.Println("录像文件夹配置失败", err) logger.Error("录像文件夹配置失败", zap.Error(err))
return return
} else { } else {
log.Println("录像文件夹配置有效") logger.Info("录像文件夹配置有效")
} }
} else { } else {
log.Println("录像文件夹配置为OSS") logger.Info("录像文件夹配置为OSS")
} }
// 每两秒定时执行 // 每两秒定时执行
for { for {
@@ -92,7 +102,11 @@ func main() {
tasks, err := api.SyncTask() tasks, err := api.SyncTask()
if err == nil { if err == nil {
for _, task := range tasks { for _, task := range tasks {
log.Printf("开始处理任务, TaskID:【%s】,DeviceNo: %s,开始时间: %s,结束时间: %s\n", task.TaskID, task.DeviceNo, task.StartTime, task.EndTime) logger.Info("开始处理任务",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.String("startTime", task.StartTime.Format("2006-01-02 15:04:05")),
zap.String("endTime", task.EndTime.Format("2006-01-02 15:04:05")))
// 处理任务 // 处理任务
for _, device := range config.Config.Devices { for _, device := range config.Config.Devices {
if device.DeviceNo == task.DeviceNo { if device.DeviceNo == task.DeviceNo {
@@ -103,7 +117,7 @@ func main() {
} }
} }
} else { } else {
log.Println("同步任务失败:", err) logger.Error("同步任务失败", zap.Error(err))
} }
// 等待两秒 // 等待两秒
<-time.After(2 * time.Second) <-time.After(2 * time.Second)

View File

@@ -2,12 +2,13 @@ package util
import ( import (
"ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"log" "go.uber.org/zap"
"math/rand" "math/rand"
"os" "os"
"os/exec" "os/exec"
@@ -36,7 +37,7 @@ func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool {
} }
// 先尝试方法1 // 先尝试方法1
if !result { if !result {
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码") logger.Warn("FFMPEG简易方法失败,尝试复杂方法转码")
// 不行再尝试方法二 // 不行再尝试方法二
result = runFfmpegForMultipleFile2(subCtx, task) result = runFfmpegForMultipleFile2(subCtx, task)
} }
@@ -80,7 +81,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts") tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
result, err := convertMp4ToTs(subCtx, *file, tmpFile) result, err := convertMp4ToTs(subCtx, *file, tmpFile)
if err != nil { if err != nil {
log.Printf("转码出错: %v", err) logger.Error("转码出错", zap.Error(err))
mu.Lock() mu.Lock()
notOk = true notOk = true
mu.Unlock() mu.Unlock()
@@ -115,7 +116,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
// 步骤三:删除临时文件 // 步骤三:删除临时文件
for _, file := range taskClone.Files { for _, file := range taskClone.Files {
if err := os.Remove(file.Url); err != nil { if err := os.Remove(file.Url); err != nil {
log.Printf("删除临时文件失败: %v", err) logger.Error("删除临时文件失败", zap.Error(err))
} }
} }
if result { if result {
@@ -185,7 +186,7 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
stat, err := os.Stat(task.OutputFile) stat, err := os.Stat(task.OutputFile)
if err != nil { if err != nil {
span.SetStatus(codes.Error, "文件不存在") span.SetStatus(codes.Error, "文件不存在")
log.Printf("文件不存在:%s", task.OutputFile) logger.Error("文件不存在", zap.String("outputFile", task.OutputFile))
return false return false
} }
span.SetAttributes(attribute.String("file.name", task.OutputFile)) span.SetAttributes(attribute.String("file.name", task.OutputFile))
@@ -219,7 +220,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
defer span.End() defer span.End()
if fileList == nil || len(fileList) == 0 { if fileList == nil || len(fileList) == 0 {
span.SetStatus(codes.Error, "无法根据要求找到对应录制片段") span.SetStatus(codes.Error, "无法根据要求找到对应录制片段")
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt) logger.Error("无法根据要求找到对应录制片段",
zap.String("taskID", task.TaskID),
zap.String("beginTime", beginDt.Format("2006-01-02 15:04:05")),
zap.String("endTime", endDt.Format("2006-01-02 15:04:05")))
return nil, fmt.Errorf("无法根据要求找到对应录制片段") return nil, fmt.Errorf("无法根据要求找到对应录制片段")
} }
// 按照 Create 的值升序排序 // 按照 Create 的值升序排序
@@ -238,7 +242,11 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 { if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 {
// 片段断开 // 片段断开
span.SetStatus(codes.Error, "FFMPEG片段断开") span.SetStatus(codes.Error, "FFMPEG片段断开")
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds()) logger.Error("分析FFMPEG任务失败,文件片段中间断开超过2秒",
zap.String("taskID", task.TaskID),
zap.String("lastFile", lastFile.Name),
zap.String("currentFile", file.Name),
zap.Float64("gapSeconds", file.StartTime.Sub(lastFile.EndTime).Seconds()))
return nil, fmt.Errorf("片段断开") return nil, fmt.Errorf("片段断开")
} }
lastFile = &file lastFile = &file
@@ -248,7 +256,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
// 通过文件列表构造的任务仍然是缺失的 // 通过文件列表构造的任务仍然是缺失的
if fileList[len(fileList)-1].EndTime.Before(endDt) { if fileList[len(fileList)-1].EndTime.Before(endDt) {
span.SetStatus(codes.Error, "FFMPEG片段断开") span.SetStatus(codes.Error, "FFMPEG片段断开")
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt) logger.Error("分析FFMPEG任务失败,无法完整覆盖时间点",
zap.String("taskID", task.TaskID),
zap.String("lastFile", fileList[len(fileList)-1].Name),
zap.String("endTime", endDt.Format("2006-01-02 15:04:05")))
return nil, fmt.Errorf("片段断开") return nil, fmt.Errorf("片段断开")
} }
@@ -326,7 +337,7 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建临时文件失败") span.SetStatus(codes.Error, "创建临时文件失败")
log.Printf("创建临时文件失败:%s", tmpFile) logger.Error("创建临时文件失败", zap.String("tmpFile", tmpFile))
return false, err return false, err
} }
defer os.Remove(tmpFile) defer os.Remove(tmpFile)
@@ -337,7 +348,7 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "写入临时文件失败") span.SetStatus(codes.Error, "写入临时文件失败")
log.Printf("写入临时文件失败:%s", tmpFile) logger.Error("写入临时文件失败", zap.String("tmpFile", tmpFile))
return false, err return false, err
} }
} }
@@ -401,7 +412,7 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
defer func() { defer func() {
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds()))) span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
}() }()
log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " ")) logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...) cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
var stderr bytes.Buffer var stderr bytes.Buffer
@@ -411,7 +422,9 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令失败") span.SetStatus(codes.Error, "FFMPEG执行命令失败")
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err return false, err
} }
defer cmd.Process.Kill() defer cmd.Process.Kill()
@@ -425,17 +438,21 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
case <-time.After(1 * time.Minute): case <-time.After(1 * time.Minute):
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出") span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " ")) logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, fmt.Errorf("ffmpeg command timed out") return false, fmt.Errorf("ffmpeg command timed out")
case err := <-done: case err := <-done:
if err != nil { if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令失败") span.SetStatus(codes.Error, "FFMPEG执行命令失败")
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err return false, err
} }
endTime := time.Now() endTime := time.Now()
log.Printf("FFMPEG执行命令结束,耗费时间:【%dms】,命令:【%s】", endTime.Sub(startTime).Milliseconds(), strings.Join(ffmpegCmd, " ")) logger.Info("FFMPEG执行命令结束",
zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return true, nil return true, nil
} }
} }