Compare commits

...

15 Commits

Author SHA1 Message Date
b23794587f feat(storage): 添加阿里云OSS存储支持
- 在StorageConfig中新增AliOSS字段以配置阿里云OSS参数
- 新增AliOSSConfig结构体定义阿里云OSS相关配置项
- 在fs包中实现AliOSSAdapter适配器用于操作阿里云OSS
- 实现GetFileList方法从阿里云OSS获取并缓存文件列表
- 添加定时清理过期缓存的功能
- 更新adapter.go根据存储类型选择对应的适配器实例
2025-12-03 15:50:09 +08:00
a678829f59 feat(api): 支持OSS文件上传时指定Content-Type
- 在UploadFileToOSS函数中新增contentType参数
- 设置请求头Content-Length和Content-Type
- 为图片上传指定image/jpeg类型
- 增加上传失败时的日志记录
- 引入zap日志库支持结构化日志输出
2025-11-24 18:09:38 +08:00
f10b68e487 refactor(api): 移除上传文件时的Content-Type参数
- 删除UploadFileToOSS函数中的contentType参数
- 更新所有调用UploadFileToOSS的地方,移除传递的Content-Type值
- 简化上传逻辑,不再手动设置HTTP请求头中的Content-Type
- 依赖服务器端自动检测文件类型进行处理
2025-11-24 17:56:05 +08:00
67968abcf3 feat(api): 新增VIID接口处理人脸上传功能
- 实现人脸数据结构定义与解析
- 添加系统注册、保活、注销代理逻辑
- 支持系统时间获取接口
- 处理人脸图片Base64解码及缩略图生成
- 异步执行人脸数据上传流程
- 提供成功与错误响应封装方法
2025-11-24 16:18:09 +08:00
11f508342d feat(config): 更新配置文件并新增自定义时间类型
- 修改 config.yaml 中的 API 地址、存储配置及设备信息
- 新增 Viid 配置项支持新功能模块
- 在 go.mod 和 go.sum 中更新依赖包版本,引入新的第三方库
- 添加 model/custom_time.go 文件实现自定义时间类型的 JSON 序列化与反序列化
- 调整 DTO 结构体以适配新的配置项
- 升级 OTLP 导出器相关依赖并移除旧的标准输出导出器
- 引入 Gin 框架及相关中间件提升服务性能
- 更新 Protobuf 和 gRPC 相关依赖至最新版本
- 增加 zap 日志库和 lumberjack 日志轮转支持
- 添加 sonic、json-iterator 等高性能 JSON 处理库优化数据解析效率
- 引入 testify 断言库增强测试代码可读性
- 更新 sync 包版本提高并发安全性
- 添加 mock 工具支持单元测试模拟对象
- 引入 validator/v10 实现请求参数校验功能
- 更新 crypto、net、text 等标准库依赖确保安全性和兼容性
- 增加 mimetype 库用于文件类型识别
- 引入 quic-go 支持 HTTP/3 协议通信
- 添加 base64x
2025-11-24 16:12:31 +08:00
4b1eb11986 日志 2025-08-04 10:49:24 +08:00
84ccaa56de 优化http连接池 2025-08-03 14:20:17 +08:00
838430ee2f 30秒缓存 2025-06-03 09:49:42 +08:00
5dfe6d6356 s3 优化缓存逻辑,添加缓存自清理逻辑 2025-04-21 15:08:02 +08:00
509b829c5b s3 修复缓存键避免被逻辑修改 2025-04-21 14:44:23 +08:00
e6f93a4d37 s3 避免缓存击穿 2025-04-21 14:37:51 +08:00
2971c5f52d s3 添加缓存避免延迟爆炸 2025-04-21 14:02:56 +08:00
3d7c88de5f s3 一次性10000个 2025-04-13 18:43:45 +08:00
f9256895b7 去除错误 2025-04-13 18:32:20 +08:00
104930c413 优化stopTime为空时逻辑爆炸的问题 2025-04-13 16:07:29 +08:00
22 changed files with 1499 additions and 148 deletions

40
api/http_client.go Normal file

@@ -0,0 +1,40 @@
package api
import (
"net/http"
"time"
)
var (
// 通用HTTP客户端,带短超时时间,适用于API调用
apiClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
Timeout: 5 * time.Second,
}
// 文件上传专用HTTP客户端,超时时间较长
uploadClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 50,
MaxIdleConnsPerHost: 5,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
Timeout: 60 * time.Second, // 文件上传需要更长的超时时间
}
)
// GetAPIClient 获取API调用客户端
func GetAPIClient() *http.Client {
return apiClient
}
// GetUploadClient 获取文件上传客户端
func GetUploadClient() *http.Client {
return uploadClient
}

@@ -2,13 +2,14 @@ package api
import (
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"io"
"log"
"net/http"
"os"
)
@@ -20,7 +21,7 @@ func UploadTaskFile(ctx context.Context, task dto.Task, file dto.FileObject) err
if err != nil {
return err
}
log.Printf("开始上传文件, URL:【%s】\n", url)
logger.Info("开始上传文件", zap.String("url", url))
if err := OssUpload(subCtx, url, file.URL); err != nil {
return err
}
@@ -58,8 +59,7 @@ func OssUpload(ctx context.Context, url, filePath string) error {
}
req.Header.Set("Content-Type", "video/mp4")
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(fileBytes)))
client := &http.Client{}
resp, err := client.Do(req)
resp, err := GetUploadClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")

@@ -3,13 +3,13 @@ package api
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"encoding/json"
"fmt"
"go.uber.org/zap"
"io"
"log"
"net/http"
"time"
)
func SyncTask() ([]dto.Task, error) {
@@ -20,27 +20,24 @@ func SyncTask() ([]dto.Task, error) {
}
jsonData, err := json.Marshal(requestBody)
if err != nil {
log.Println("Error marshaling JSON:", err)
logger.Error("序列化JSON失败", zap.Error(err))
return nil, err
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
log.Println("Error creating request:", err)
logger.Error("创建请求失败", zap.Error(err))
return nil, err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{
Timeout: 5 * time.Second, // 设置超时时间为5秒
}
resp, err := client.Do(req)
resp, err := GetAPIClient().Do(req)
if err != nil {
log.Println("Error sending request:", err)
logger.Error("发送请求失败", zap.Error(err))
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Println("Error reading response body:", err)
logger.Error("读取响应体失败", zap.Error(err))
return nil, err
}
@@ -48,12 +45,13 @@ func SyncTask() ([]dto.Task, error) {
var response dto.TaskListResponse
err = json.Unmarshal(body, &response)
if err != nil {
log.Println("->:", string(body))
log.Println("Error unmarshaling response body:", err)
logger.Error("解析响应体失败",
zap.String("responseBody", string(body)),
zap.Error(err))
return nil, err
}
if response.Code != 200 {
log.Println("Error response code:", response.Code)
logger.Error("响应错误码", zap.Int("code", response.Code))
return nil, fmt.Errorf(response.Msg)
}
return response.Data, nil

@@ -3,13 +3,14 @@ package api
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"context"
"encoding/json"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
"io"
"log"
"net/http"
)
@@ -23,15 +24,14 @@ func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) {
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
log.Println("Error creating request:", err)
logger.Error("创建请求失败", zap.Error(err))
return "", err
}
client := &http.Client{}
resp, err := client.Do(req)
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
log.Println("Error sending request:", err)
logger.Error("发送请求失败", zap.Error(err))
return "", err
}
span.SetAttributes(attribute.String("http.status", resp.Status))
@@ -41,7 +41,7 @@ func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) {
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "读取响应体失败")
log.Println("Error reading response body:", err)
logger.Error("读取响应体失败", zap.Error(err))
return "", err
}
return string(body), nil
@@ -58,16 +58,15 @@ func ReportTaskFailure(ctx context.Context, taskId string) bool {
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
log.Println("Error creating request:", err)
logger.Error("创建请求失败", zap.Error(err))
return false
}
client := &http.Client{}
resp, err := client.Do(req)
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
log.Println("Error sending request:", err)
logger.Error("发送请求失败", zap.Error(err))
return false
}
defer resp.Body.Close()
@@ -94,7 +93,7 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "序列化JSON失败")
log.Println("Error marshaling JSON:", err)
logger.Error("序列化JSON失败", zap.Error(err))
return false
}
@@ -102,17 +101,16 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建请求失败")
log.Println("Error creating request:", err)
logger.Error("创建请求失败", zap.Error(err))
return false
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
resp, err := GetAPIClient().Do(req)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "发送请求失败")
log.Println("Error sending request:", err)
logger.Error("发送请求失败", zap.Error(err))
return false
}
defer resp.Body.Close()

215
api/viid_client.go Normal file

@@ -0,0 +1,215 @@
package api
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"go.uber.org/zap"
)
// VIID Request/Response Structures
type DeviceIdObject struct {
DeviceID string `json:"DeviceID"`
}
type RegisterRequest struct {
RegisterObject DeviceIdObject `json:"RegisterObject"`
}
type KeepaliveRequest struct {
KeepaliveObject DeviceIdObject `json:"KeepaliveObject"`
}
type UnRegisterRequest struct {
UnRegisterObject DeviceIdObject `json:"UnRegisterObject"`
}
type VIIDBaseResponse struct {
ResponseStatusObject ResponseStatusObject `json:"ResponseStatusObject"`
}
type ResponseStatusObject struct {
ID string `json:"Id"`
RequestURL string `json:"RequestURL"`
StatusCode string `json:"StatusCode"`
StatusString string `json:"StatusString"`
LocalTime string `json:"LocalTime"`
}
type SystemTimeResponse struct {
ResponseStatusObject struct {
LocalTime string `json:"LocalTime"` // yyyyMMddHHmmss
} `json:"ResponseStatusObject"`
}
// VIID Client Methods
func ProxyRequest(ctx context.Context, method, path string, body io.Reader, header http.Header) (*http.Response, error) {
url := fmt.Sprintf("%s%s", config.Config.Viid.ServerUrl, path)
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, err
}
// Copy headers
for k, v := range header {
req.Header[k] = v
}
return GetAPIClient().Do(req)
}
// Upload Proxy Structures
type UploadConfig struct {
TaskID int64 `json:"taskId"`
FaceUploadURL string `json:"faceUploadUrl"`
ThumbnailUploadURL string `json:"thumbnailUploadUrl"`
SourceUploadURL string `json:"sourceUploadUrl"`
ExpiresAt time.Time `json:"expiresAt"`
}
type ProxyResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Success bool `json:"success"`
Data *UploadConfig `json:"data"` // Data can be UploadConfig or string depending on endpoint
}
type FacePositionInfo struct {
LeftTopX int `json:"leftTopX"`
LeftTopY int `json:"leftTopY"`
RightBtmX int `json:"rightBtmX"`
RightBtmY int `json:"rightBtmY"`
ImgWidth int `json:"imgWidth"`
ImgHeight int `json:"imgHeight"`
ShotTime string `json:"shotTime"` // yyyyMMddHHmmss
}
type SubmitResultRequest struct {
TaskID int64 `json:"taskId"`
FacePosition FacePositionInfo `json:"facePosition"`
}
type SubmitFailureRequest struct {
TaskID int64 `json:"taskId"`
ErrorCode string `json:"errorCode"`
ErrorMessage string `json:"errorMessage"`
}
// Upload Proxy Methods
func GetUploadConfig(ctx context.Context, scenicId int64, deviceNo string) (*UploadConfig, error) {
url := fmt.Sprintf("%s/proxy/VIID/upload-config?scenicId=%d&deviceNo=%s", config.Config.Viid.ServerUrl, scenicId, deviceNo)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := GetAPIClient().Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result ProxyResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
if !result.Success {
return nil, fmt.Errorf("get upload config failed: %s", result.Message)
}
return result.Data, nil
}
func SubmitResult(ctx context.Context, taskID int64, facePos FacePositionInfo) error {
url := fmt.Sprintf("%s/proxy/VIID/submit-result", config.Config.Viid.ServerUrl)
reqData := SubmitResultRequest{
TaskID: taskID,
FacePosition: facePos,
}
jsonData, _ := json.Marshal(reqData)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := GetAPIClient().Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
var result struct {
Code int `json:"code"`
Message string `json:"message"`
Success bool `json:"success"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}
if !result.Success {
return fmt.Errorf("submit result failed: %s", result.Message)
}
return nil
}
func SubmitFailure(ctx context.Context, taskID int64, errorCode, errorMessage string) error {
url := fmt.Sprintf("%s/proxy/VIID/submit-failure", config.Config.Viid.ServerUrl)
reqData := SubmitFailureRequest{
TaskID: taskID,
ErrorCode: errorCode,
ErrorMessage: errorMessage,
}
jsonData, _ := json.Marshal(reqData)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := GetAPIClient().Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
func UploadFileToOSS(ctx context.Context, uploadUrl string, data []byte, contentType string) error {
req, err := http.NewRequestWithContext(ctx, "PUT", uploadUrl, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data)))
req.Header.Set("Content-Type", contentType)
resp, err := GetUploadClient().Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
logger.Error("oss upload failed", zap.String("body", string(body)))
return fmt.Errorf("oss upload failed: %d", resp.StatusCode)
}
return nil
}

248
api/viid_handler.go Normal file

@@ -0,0 +1,248 @@
package api
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util"
"context"
"encoding/base64"
"io"
"net/http"
"time"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
// Request Structs for Handler
type FaceUploadRequest struct {
FaceListObject FaceListObject `json:"FaceListObject"`
}
type FaceListObject struct {
FaceObject []FaceObject `json:"FaceObject"`
}
type FaceObject struct {
FaceID string `json:"FaceID"`
DeviceID string `json:"DeviceID"`
ShotTime string `json:"ShotTime"`
FaceAppearTime string `json:"FaceAppearTime"`
SubImageList SubImageList `json:"SubImageList"`
// Position info
LeftTopX *int `json:"LeftTopX,omitempty"`
LeftTopY *int `json:"LeftTopY,omitempty"`
RightBtmX *int `json:"RightBtmX,omitempty"`
RightBtmY *int `json:"RightBtmY,omitempty"`
}
type SubImageList struct {
SubImageInfoObject []SubImageInfoObject `json:"SubImageInfoObject"`
}
type SubImageInfoObject struct {
ImageID string `json:"ImageID"`
Type string `json:"Type"` // "11"=Face, "14"=Source
FileFormat string `json:"FileFormat"`
Data string `json:"Data"` // Base64
Width *int `json:"Width,omitempty"`
Height *int `json:"Height,omitempty"`
}
// Register Routes
func RegisterVIIDRoutes(r *gin.Engine) {
viid := r.Group("/VIID")
{
sys := viid.Group("/System")
{
sys.POST("/Register", HandleRegister)
sys.POST("/Keepalive", HandleKeepalive)
sys.POST("/UnRegister", HandleUnRegister)
sys.GET("/Time", HandleSystemTime)
}
viid.POST("/Faces", HandleUploadFaces)
}
}
// HTTP Handlers
func HandleRegister(c *gin.Context) {
proxyHelper(c, "/VIID/System/Register")
}
func HandleKeepalive(c *gin.Context) {
proxyHelper(c, "/VIID/System/Keepalive")
}
func HandleUnRegister(c *gin.Context) {
proxyHelper(c, "/VIID/System/UnRegister")
}
func proxyHelper(c *gin.Context, path string) {
resp, err := ProxyRequest(c.Request.Context(), c.Request.Method, path, c.Request.Body, c.Request.Header)
if err != nil {
logger.Error("Proxy request failed", zap.String("path", path), zap.Error(err))
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
return
}
defer resp.Body.Close()
// Copy Headers
for k, v := range resp.Header {
c.Writer.Header()[k] = v
}
c.Status(resp.StatusCode)
io.Copy(c.Writer, resp.Body)
}
func HandleSystemTime(c *gin.Context) {
now := time.Now()
timeStr := now.Format("20060102150405") // VIID format
// Determine TimeZone string (e.g., "Asia/Shanghai")
// Simple fixed string or system local
timeZone := "Local"
loc, err := time.LoadLocation("Local")
if err == nil {
timeZone = loc.String()
}
resp := map[string]interface{}{
"SystemTimeObject": map[string]string{
"VIIDServerID": "00000000000000000001", // Placeholder
"TimeMode": "2", // 2: Atomic Clock / NTP
"LocalTime": timeStr,
"TimeZone": timeZone,
},
}
c.JSON(http.StatusOK, resp)
}
func HandleUploadFaces(c *gin.Context) {
var req FaceUploadRequest
if err := c.ShouldBindJSON(&req); err != nil {
logger.Error("Invalid JSON in UploadFaces", zap.Error(err))
sendErrorResponse(c, "/VIID/Faces", "", "Invalid JSON")
return
}
var lastFaceID string
for _, face := range req.FaceListObject.FaceObject {
lastFaceID = face.FaceID
deviceID := face.DeviceID
// 1. Parse Images
var faceImg, srcImg []byte
var imgWidth, imgHeight int
for _, subImg := range face.SubImageList.SubImageInfoObject {
data, err := base64.StdEncoding.DecodeString(subImg.Data)
if err != nil {
logger.Error("Base64 decode failed", zap.String("faceId", face.FaceID), zap.Error(err))
continue
}
if subImg.Type == "11" { // Face
faceImg = data
} else if subImg.Type == "14" { // Source
srcImg = data
if subImg.Width != nil {
imgWidth = *subImg.Width
}
if subImg.Height != nil {
imgHeight = *subImg.Height
}
}
}
// 2. Construct Position Info
pos := FacePositionInfo{
ImgWidth: imgWidth,
ImgHeight: imgHeight,
ShotTime: parseShotTime(face.ShotTime, face.FaceAppearTime),
}
if face.LeftTopX != nil {
pos.LeftTopX = *face.LeftTopX
}
if face.LeftTopY != nil {
pos.LeftTopY = *face.LeftTopY
}
if face.RightBtmX != nil {
pos.RightBtmX = *face.RightBtmX
}
if face.RightBtmY != nil {
pos.RightBtmY = *face.RightBtmY
}
// 3. Generate Thumbnail (if Source Image and Coordinates exist)
var thumbImg []byte
if len(srcImg) > 0 && pos.LeftTopX > 0 && pos.LeftTopY > 0 { // Basic validation
// Use Source Image to generate thumbnail
// The thumbnail is 1/2 original size, centered on face
var err error
thumbImg, err = util.GenerateThumbnailFromCoords(srcImg, pos.LeftTopX, pos.LeftTopY, pos.RightBtmX, pos.RightBtmY)
if err != nil {
logger.Error("Failed to generate thumbnail", zap.String("faceId", face.FaceID), zap.Error(err))
// Continue without thumbnail? or fail?
// Usually continue, but log error.
}
}
// 4. Execute Upload Flow
scenicId := config.Config.Viid.ScenicId
go func(fID, dID string, fData, tData, sData []byte, p FacePositionInfo) {
// Create a detached context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
if err := UploadFaceData(ctx, scenicId, dID, fData, tData, sData, p); err != nil {
logger.Error("Failed to process face upload", zap.String("faceId", fID), zap.Error(err))
}
}(face.FaceID, deviceID, faceImg, thumbImg, srcImg, pos)
}
// Respond Success
sendSuccessResponse(c, "/VIID/Faces", lastFaceID, "OK")
}
// Helpers
func sendErrorResponse(c *gin.Context, url, id, msg string) {
resp := VIIDBaseResponse{
ResponseStatusObject: ResponseStatusObject{
ID: id,
RequestURL: url,
StatusCode: "1",
StatusString: msg,
LocalTime: time.Now().Format("20060102150405"),
},
}
c.JSON(http.StatusBadRequest, resp)
}
func sendSuccessResponse(c *gin.Context, url, id, msg string) {
resp := VIIDBaseResponse{
ResponseStatusObject: ResponseStatusObject{
ID: id,
RequestURL: url,
StatusCode: "0",
StatusString: msg,
LocalTime: time.Now().Format("20060102150405"),
},
}
c.JSON(http.StatusOK, resp)
}
func parseShotTime(t1, t2 string) string {
if t1 != "" {
return t1
}
if t2 != "" {
return t2
}
return time.Now().Format("20060102150405")
}

70
api/viid_upload.go Normal file

@@ -0,0 +1,70 @@
package api
import (
"ZhenTuLocalPassiveAdapter/logger"
"context"
"fmt"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// UploadFaceData implements the coordinated upload flow: Get Config -> Upload -> Notify
func UploadFaceData(ctx context.Context, scenicId int64, deviceNo string, faceImg, thumbImg, srcImg []byte, facePos FacePositionInfo) error {
// 1. Get Upload Config
uploadConfig, err := GetUploadConfig(ctx, scenicId, deviceNo)
if err != nil {
logger.Error("获取上传配置失败", zap.String("deviceNo", deviceNo), zap.Error(err))
return err
}
logger.Info("获取到VIID任务ID", zap.Int64("taskId", uploadConfig.TaskID))
// 2. Parallel Upload to OSS
g, subCtx := errgroup.WithContext(ctx)
// Upload Face Image
g.Go(func() error {
if len(faceImg) > 0 {
if err := UploadFileToOSS(subCtx, uploadConfig.FaceUploadURL, faceImg, "image/jpeg"); err != nil {
return fmt.Errorf("upload face image failed: %w", err)
}
}
return nil
})
// Upload Thumbnail Image
g.Go(func() error {
if len(thumbImg) > 0 {
if err := UploadFileToOSS(subCtx, uploadConfig.ThumbnailUploadURL, thumbImg, "image/jpeg"); err != nil {
return fmt.Errorf("upload thumbnail image failed: %w", err)
}
}
return nil
})
// Upload Source Image
g.Go(func() error {
if len(srcImg) > 0 {
if err := UploadFileToOSS(subCtx, uploadConfig.SourceUploadURL, srcImg, "image/jpeg"); err != nil {
return fmt.Errorf("upload source image failed: %w", err)
}
}
return nil
})
if err := g.Wait(); err != nil {
// Report failure
SubmitFailure(ctx, uploadConfig.TaskID, "UPLOAD_FAILED", err.Error())
logger.Error("文件上传失败", zap.Int64("taskId", uploadConfig.TaskID), zap.Error(err))
return err
}
// 3. Submit Result
if err := SubmitResult(ctx, uploadConfig.TaskID, facePos); err != nil {
logger.Error("提交结果失败", zap.Int64("taskId", uploadConfig.TaskID), zap.Error(err))
return err
}
logger.Info("VIID任务完成", zap.Int64("taskId", uploadConfig.TaskID))
return nil
}

@@ -1,24 +1,27 @@
api:
# baseUrl: "https://zhentuai.com/vpt/v1/scenic/3946669713328836608"
baseUrl: "http://127.0.0.1:8030/vpt/v1/scenic/3946669713328836608"
baseUrl: "https://zhentuai.com/vpt/v1/scenic/3975985126059413504"
record:
storage:
path: "/root/opt/"
type: "s3"
s3:
region: us-east-1
endpoint: http://192.168.55.101:9000
endpoint: http://127.0.0.1:9000
bucket: opt
prefix:
akId: 5vzfDiMztKO6VLvygoeX
akSec: Ot77u2kdVTm8zfQgExFrsm7xlGecxsiR6jk1idXM
duration: 60
akId: Idi2MBaWH2F0LFIWGdDY
akSec: Idi2MBaWH2F0LFIWGdDY
duration: 30
devices:
- deviceNo: "34020000001322200001"
name: "192.168.55.201"
path: "/root/opt/34020000001322200001/"
- deviceNo: "44020000001322500001"
name: "ppda-010268-zymyj"
fileName:
timeSplit: "_"
dateSeparator: ""
fileExt: "ts"
unFinExt: "ts"
unFinExt: "ts"
viid:
enabled: true
serverUrl: "http://127.0.0.1:18083"
scenicId: 3975985126059413504
port: 8080

@@ -10,9 +10,10 @@ type RecordConfig struct {
}
type StorageConfig struct {
Type string `mapstructure:"type"`
Path string `mapstructure:"path"`
S3 S3Config `mapstructure:"s3"`
Type string `mapstructure:"type"`
Path string `mapstructure:"path"`
S3 S3Config `mapstructure:"s3"`
AliOSS AliOSSConfig `mapstructure:"aliOss"`
}
type S3Config struct {
@@ -24,6 +25,14 @@ type S3Config struct {
AkSec string `mapstructure:"akSec"`
}
type AliOSSConfig struct {
Endpoint string `mapstructure:"endpoint"`
Bucket string `mapstructure:"bucket"`
Prefix string `mapstructure:"prefix"`
AccessKeyId string `mapstructure:"accessKeyId"`
AccessKeySecret string `mapstructure:"accessKeySecret"`
}
type DeviceMapping struct {
DeviceNo string `mapstructure:"deviceNo" json:"deviceNo"`
Name string `mapstructure:"name" json:"name"`
@@ -36,9 +45,17 @@ type FileNameConfig struct {
UnfinishedFileExt string `mapstructure:"unFinExt"`
}
type ViidConfig struct {
Enabled bool `mapstructure:"enabled"`
ServerUrl string `mapstructure:"serverUrl"`
ScenicId int64 `mapstructure:"scenicId"`
Port int `mapstructure:"port"`
}
type MainConfig struct {
Api ApiConfig `mapstructure:"api"`
Record RecordConfig `mapstructure:"record"`
Devices []DeviceMapping `mapstructure:"devices"`
FileName FileNameConfig `mapstructure:"fileName"`
Viid ViidConfig `mapstructure:"viid"`
}

@@ -1,8 +1,9 @@
package config
import (
"ZhenTuLocalPassiveAdapter/logger"
"github.com/spf13/viper"
"log"
"go.uber.org/zap"
)
var Config MainConfig
@@ -14,13 +15,13 @@ func LoadConfig() error {
// 读取配置文件
if err := viper.ReadInConfig(); err != nil {
log.Fatalf("Error reading config file, %s", err)
logger.Fatal("读取配置文件失败", zap.Error(err))
return err
}
// 反序列化配置到结构体
if err := viper.Unmarshal(&Config); err != nil {
log.Fatalf("Unable to decode into struct, %v", err)
logger.Fatal("解析配置失败", zap.Error(err))
return err
}
return nil

@@ -4,13 +4,14 @@ import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/fs"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util"
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"log"
"go.uber.org/zap"
"path"
)
@@ -32,7 +33,9 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "获取文件列表失败")
log.Printf("获取文件列表失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
logger.Error("获取文件列表失败",
zap.String("deviceNo", device.DeviceNo),
zap.Error(err))
return nil, err
}
files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime)
@@ -45,7 +48,9 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件片段检查失败")
log.Printf("文件片段检查失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
logger.Error("文件片段检查失败",
zap.String("deviceNo", device.DeviceNo),
zap.Error(err))
return nil, err
}
ok := util.RunFfmpegTask(subCtx, constructTask)

@@ -16,6 +16,10 @@ func GetAdapter() Adapter {
return &S3Adapter{
StorageConfig: config.Config.Record.Storage,
}
} else if config.Config.Record.Storage.Type == "alioss" {
return &AliOSSAdapter{
StorageConfig: config.Config.Record.Storage,
}
} else {
return &LocalAdapter{
config.Config.Record.Storage,

203
fs/ali_adapter.go Normal file

@@ -0,0 +1,203 @@
package fs
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util"
"context"
"fmt"
"path"
"sort"
"sync"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/zap"
)
var aliOssCache sync.Map
type AliOSSAdapter struct {
StorageConfig config.StorageConfig
ossClient *oss.Client
}
func (a *AliOSSAdapter) getClient() (*oss.Client, error) {
if a.ossClient == nil {
client, err := oss.New(
a.StorageConfig.AliOSS.Endpoint,
a.StorageConfig.AliOSS.AccessKeyId,
a.StorageConfig.AliOSS.AccessKeySecret,
)
if err != nil {
return nil, fmt.Errorf("创建阿里云OSS客户端失败: %w", err)
}
a.ossClient = client
}
return a.ossClient, nil
}
func (a *AliOSSAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) {
_, span := tracer.Start(ctx, "GetFileList_alioss")
defer span.End()
span.SetAttributes(attribute.String("path", dirPath))
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
if a.StorageConfig.AliOSS.Bucket == "" {
span.SetAttributes(attribute.String("error", "未配置阿里云OSS存储桶"))
span.SetStatus(codes.Error, "未配置阿里云OSS存储桶")
return nil, fmt.Errorf("未配置阿里云OSS存储桶")
}
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
if cachedInterface, ok := aliOssCache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
if time.Now().Before(cachedItem.expires) {
logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey))
span.SetAttributes(attribute.Bool("cache.hit", true))
return cachedItem.data, nil
}
}
mutexKey := fmt.Sprintf("lock_%s", cacheKey)
mutex, _ := aliOssCache.LoadOrStore(mutexKey, &sync.Mutex{})
lock := mutex.(*sync.Mutex)
defer func() {
// 解锁后删除锁(避免内存泄漏)
aliOssCache.Delete(mutexKey)
lock.Unlock()
}()
lock.Lock()
if cachedInterface, ok := aliOssCache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
if time.Now().Before(cachedItem.expires) {
logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey))
span.SetAttributes(attribute.Bool("aliOssCache.hit", true))
return cachedItem.data, nil
}
}
client, err := a.getClient()
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建阿里云OSS客户端失败")
return nil, err
}
bucket, err := client.Bucket(a.StorageConfig.AliOSS.Bucket)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "获取存储桶失败")
return nil, fmt.Errorf("获取存储桶失败: %w", err)
}
var fileList []dto.File
prefix := path.Join(a.StorageConfig.AliOSS.Prefix, dirPath)
marker := ""
for {
lsRes, err := bucket.ListObjects(
oss.Prefix(prefix),
oss.Marker(marker),
oss.MaxKeys(1000),
)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "文件列表读取失败")
return nil, fmt.Errorf("文件列表读取失败: %w", err)
}
for _, object := range lsRes.Objects {
key := object.Key
if !util.IsVideoFile(path.Base(key)) {
continue
}
startTime, stopTime, err := util.ParseStartStopTime(path.Base(key), relDt)
if err != nil {
continue
}
if stopTime.IsZero() {
stopTime = startTime
}
if startTime.Equal(stopTime) {
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
}
// 生成预签名URL(有效期10分钟)
signedURL, err := bucket.SignURL(key, oss.HTTPGet, 600)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "生成预签名URL失败")
logger.Error("生成预签名URL失败", zap.Error(err))
continue
}
fileList = append(fileList, dto.File{
BasePath: a.StorageConfig.AliOSS.Bucket,
Name: path.Base(key),
Path: path.Dir(key),
Url: signedURL,
StartTime: startTime,
EndTime: stopTime,
})
}
if !lsRes.IsTruncated {
break
}
marker = lsRes.NextMarker
}
span.SetAttributes(attribute.Int("file.count", len(fileList)))
sort.Slice(fileList, func(i, j int) bool {
return fileList[i].StartTime.Before(fileList[j].StartTime)
})
span.SetStatus(codes.Ok, "文件读取成功")
cacheItem := cacheItem{
data: fileList,
expires: time.Now().Add(30 * time.Second),
}
aliOssCache.Store(cacheKey, cacheItem)
logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey))
return fileList, nil
}
// 添加定时清理缓存的初始化函数
func init() {
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cleanupAliOssCache()
}
}
}()
}
// 添加缓存清理函数
func cleanupAliOssCache() {
var keysToDelete []interface{}
aliOssCache.Range(func(key, value interface{}) bool {
item, ok := value.(cacheItem)
if !ok {
return true
}
if time.Now().After(item.expires) {
keysToDelete = append(keysToDelete, key)
}
return true
})
for _, key := range keysToDelete {
aliOssCache.Delete(key)
}
}

@@ -26,6 +26,9 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
span.SetStatus(codes.Error, "未配置存储路径")
return nil, fmt.Errorf("未配置存储路径")
}
span.SetAttributes(attribute.String("path", dirPath))
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
// 读取文件夹下目录
files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath))
if err != nil {
@@ -51,7 +54,10 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
if err != nil {
continue
}
if startTime.Equal(stopTime) || stopTime.IsZero() {
if stopTime.IsZero() {
stopTime = startTime
}
if startTime.Equal(stopTime) {
// 如果文件名没有时间戳,则认为该文件是未录制完成的
// 尝试读取一下视频信息
duration, err := util.GetVideoDuration(subCtx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))

@@ -3,13 +3,15 @@ package fs
import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/util"
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/credentials"
"log"
"go.uber.org/zap"
"path"
"sort"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
@@ -18,6 +20,8 @@ import (
"go.opentelemetry.io/otel/codes"
)
var s3Cache sync.Map
type S3Adapter struct {
StorageConfig config.StorageConfig
s3Client *s3.Client
@@ -49,15 +53,49 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
_, span := tracer.Start(ctx, "GetFileList_s3")
defer span.End()
span.SetAttributes(attribute.String("path", dirPath))
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
if s.StorageConfig.S3.Bucket == "" {
span.SetAttributes(attribute.String("error", "未配置S3存储桶"))
span.SetStatus(codes.Error, "未配置S3存储桶")
return nil, fmt.Errorf("未配置S3存储桶")
}
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
if time.Now().Before(cachedItem.expires) {
logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey))
span.SetAttributes(attribute.Bool("cache.hit", true))
return cachedItem.data, nil
}
}
mutexKey := fmt.Sprintf("lock_%s", cacheKey)
mutex, _ := s3Cache.LoadOrStore(mutexKey, &sync.Mutex{})
lock := mutex.(*sync.Mutex)
defer func() {
// 解锁后删除锁(避免内存泄漏)
s3Cache.Delete(mutexKey)
lock.Unlock()
}()
lock.Lock()
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
if time.Now().Before(cachedItem.expires) {
logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey))
span.SetAttributes(attribute.Bool("s3Cache.hit", true))
return cachedItem.data, nil
}
}
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(s.StorageConfig.S3.Bucket),
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
Bucket: aws.String(s.StorageConfig.S3.Bucket),
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
MaxKeys: aws.Int32(1000),
}
client, err := s.getClient()
@@ -91,7 +129,10 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
if err != nil {
continue
}
if startTime.Equal(stopTime) || stopTime.IsZero() {
if stopTime.IsZero() {
stopTime = startTime
}
if startTime.Equal(stopTime) {
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
}
presignClient := s3.NewPresignClient(client)
@@ -104,7 +145,7 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "生成预签名URL失败")
log.Println("Error presigning GetObject request:", err)
logger.Error("生成预签名URL失败", zap.Error(err))
continue
}
fileList = append(fileList, dto.File{
@@ -128,5 +169,47 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
return fileList[i].StartTime.Before(fileList[j].StartTime)
})
span.SetStatus(codes.Ok, "文件读取成功")
cacheItem := cacheItem{
data: fileList,
expires: time.Now().Add(30 * time.Second),
}
s3Cache.Store(cacheKey, cacheItem)
logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey))
return fileList, nil
}
type cacheItem struct {
data []dto.File
expires time.Time
}
// 添加定时清理缓存的初始化函数
func init() {
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cleanupCache()
}
}
}()
}
// 添加缓存清理函数
func cleanupCache() {
var keysToDelete []interface{}
s3Cache.Range(func(key, value interface{}) bool {
item := value.(cacheItem)
if time.Now().After(item.expires) {
keysToDelete = append(keysToDelete, key)
}
return true
})
for _, key := range keysToDelete {
s3Cache.Delete(key)
}
}

53
go.mod

@@ -1,19 +1,23 @@
module ZhenTuLocalPassiveAdapter
go 1.23.0
go 1.24.0
toolchain go1.23.6
toolchain go1.24.11
require (
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go-v2 v1.36.3
github.com/aws/aws-sdk-go-v2/credentials v1.17.62
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2
github.com/gin-gonic/gin v1.11.0
github.com/spf13/viper v1.20.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0
go.opentelemetry.io/otel/sdk v1.35.0
go.opentelemetry.io/otel/sdk/metric v1.35.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.16.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
require (
@@ -26,33 +30,58 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
github.com/aws/smithy-go v1.22.3 // indirect
github.com/bytedance/sonic v1.14.0 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/gin-contrib/sse v1.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.27.0 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.54.0 // indirect
github.com/sagikazarmark/locafero v0.8.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.14.0 // indirect
github.com/spf13/cast v1.7.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.3.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/mock v0.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/arch v0.20.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.14.0 // indirect
golang.org/x/tools v0.34.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
google.golang.org/protobuf v1.36.9 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

105
go.sum

@@ -1,3 +1,5 @@
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
@@ -22,35 +24,79 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2 h1:jIiopHEV22b4yQP2q36Y0OmwLbsxN
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2/go.mod h1:U5SNqwhXB3Xe6F47kXvWihPl/ilGaEDe8HD/50Z9wxc=
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w=
github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM=
github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk=
github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/sagikazarmark/locafero v0.8.0 h1:mXaMVw7IqxNBxfv3LdWt9MDmcWDQ1fagDH918lOdVaQ=
@@ -65,10 +111,21 @@ github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.20.0 h1:zrxIyR3RQIOsarIrgL8+sAvALXul9jeEPa06Y0Ph6vY=
github.com/spf13/viper v1.20.0/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA=
github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
@@ -77,10 +134,6 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0f
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0 h1:PB3Zrjs1sG1GBX51SXyTSoOTqcDglmsk7nT6tkKPb/k=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0/go.mod h1:U2R3XyVPzn0WX7wOIypPuptulsMcPDPs/oiSVOMVnHY=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0 h1:T0Ec2E+3YZf5bgTNQVet8iTDW7oIk03tXHq+wkwIDnE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0/go.mod h1:30v2gqH+vYGJsesLWFov8u47EpYTcIQcBjKpI6pJThg=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
@@ -93,24 +146,44 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0=
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg=
google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

74
logger/logger.go Normal file

@@ -0,0 +1,74 @@
package logger
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
var Logger *zap.Logger
func Init() error {
config := zap.NewProductionConfig()
config.OutputPaths = []string{"stdout"}
config.ErrorOutputPaths = []string{"stderr"}
// 配置日志轮换
lumberJackLogger := &lumberjack.Logger{
Filename: "logs/app.log",
MaxSize: 10, // MB
MaxBackups: 3,
MaxAge: 30, // days
Compress: true,
}
// 创建写入器
w := zapcore.AddSync(lumberJackLogger)
// 设置日志级别
level := zap.NewAtomicLevelAt(zap.InfoLevel)
// 创建编码器配置
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.TimeKey = "time"
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.MessageKey = "msg"
encoderConfig.CallerKey = "caller"
// 创建控制台编码器(文本格式)
encoder := zapcore.NewConsoleEncoder(encoderConfig)
// 创建核心
core := zapcore.NewCore(encoder, w, level)
// 创建日志记录器
Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
return nil
}
func Info(message string, fields ...zap.Field) {
Logger.Info(message, fields...)
}
func Error(message string, fields ...zap.Field) {
Logger.Error(message, fields...)
}
func Warn(message string, fields ...zap.Field) {
Logger.Warn(message, fields...)
}
func Debug(message string, fields ...zap.Field) {
Logger.Debug(message, fields...)
}
func Fatal(message string, fields ...zap.Field) {
Logger.Fatal(message, fields...)
}
func Sync() error {
return Logger.Sync()
}

160
main.go

@@ -5,14 +5,20 @@ import (
"ZhenTuLocalPassiveAdapter/config"
"ZhenTuLocalPassiveAdapter/core"
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"ZhenTuLocalPassiveAdapter/telemetry"
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"log"
"os"
"time"
"go.uber.org/zap"
)
var tracer = otel.Tracer("vpt")
@@ -28,84 +34,146 @@ func startTask(device config.DeviceMapping, task dto.Task) {
fo, err := core.HandleTask(ctx, device, task)
if err != nil {
span.SetStatus(codes.Error, "处理任务失败")
log.Printf("处理任务失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
logger.Error("处理任务失败",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID)
return
}
span.SetAttributes(attribute.String("fileUrl", fo.URL))
log.Printf("处理任务成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
logger.Info("处理任务成功",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo))
err = api.UploadTaskFile(ctx, task, *fo)
if err != nil {
span.SetStatus(codes.Error, "上传文件失败")
log.Printf("上传文件失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
logger.Error("上传文件失败",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
api.ReportTaskFailure(ctx, task.TaskID)
return
}
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
if !result {
span.SetStatus(codes.Error, "上报任务成功失败")
log.Printf("上报任务成功失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
logger.Error("上报任务成功失败",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.Error(err))
return
}
span.SetStatus(codes.Ok, "上传文件成功")
log.Printf("上传文件成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
logger.Info("上传文件成功",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo))
}
func runTaskLoop(ctx context.Context) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 执行任务
tasks, err := api.SyncTask()
if err == nil {
for _, task := range tasks {
logger.Info("开始处理任务",
zap.String("taskID", task.TaskID),
zap.String("deviceNo", task.DeviceNo),
zap.String("startTime", task.StartTime.Format("2006-01-02 15:04:05")),
zap.String("endTime", task.EndTime.Format("2006-01-02 15:04:05")))
// 处理任务
for _, device := range config.Config.Devices {
if device.DeviceNo == task.DeviceNo {
// 处理任务
go startTask(device, task)
break // 提前返回,避免不必要的循环
}
}
}
} else {
logger.Error("同步任务失败", zap.Error(err))
}
}
}
}
func startViidServer() {
if !config.Config.Viid.Enabled {
return
}
gin.SetMode(gin.ReleaseMode)
r := gin.Default()
// Register Routes
api.RegisterVIIDRoutes(r)
addr := fmt.Sprintf(":%d", config.Config.Viid.Port)
logger.Info("VIID Server starting", zap.String("addr", addr))
go func() {
if err := r.Run(addr); err != nil {
logger.Error("VIID Server failed", zap.Error(err))
}
}()
}
func main() {
err := config.LoadConfig()
// 初始化日志
err := logger.Init()
if err != nil {
log.Println("加载配置文件失败:", err)
panic(err)
}
defer logger.Sync()
err = config.LoadConfig()
if err != nil {
logger.Fatal("加载配置文件失败", zap.Error(err))
return
}
// 日志文件路径
logFilePath := "app.log"
ctx := context.Background()
shutdown, err := telemetry.InitTelemetry(ctx)
if err != nil {
log.Fatalf("Failed to initialize telemetry: %v", err)
logger.Fatal("Failed to initialize telemetry", zap.Error(err))
return
}
defer shutdown(ctx)
// 创建或打开日志文件
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logFile.Close()
// 设置日志输出到文件
log.SetOutput(logFile)
if config.Config.Record.Storage.Type == "local" {
_, err = os.Stat(config.Config.Record.Storage.Path)
if err != nil {
log.Println("录像文件夹配置失败", err)
logger.Error("录像文件夹配置失败", zap.Error(err))
return
} else {
log.Println("录像文件夹配置有效")
logger.Info("录像文件夹配置有效")
}
} else {
log.Println("录像文件夹配置为OSS")
}
// 每两秒定时执行
for {
// 执行任务
tasks, err := api.SyncTask()
if err == nil {
for _, task := range tasks {
log.Printf("开始处理任务, TaskID:【%s】,DeviceNo: %s,开始时间: %s,结束时间: %s\n", task.TaskID, task.DeviceNo, task.StartTime, task.EndTime)
// 处理任务
for _, device := range config.Config.Devices {
if device.DeviceNo == task.DeviceNo {
// 处理任务
go startTask(device, task)
break // 提前返回,避免不必要的循环
}
}
}
} else {
log.Println("同步任务失败:", err)
}
// 等待两秒
<-time.After(2 * time.Second)
logger.Info("录像文件夹配置为OSS")
}
// Start VIID Server
startViidServer()
// Context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle Signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Start Task Loop
go runTaskLoop(ctx)
// Wait for signal
<-sigChan
logger.Info("Received shutdown signal")
cancel()
logger.Info("Shutdown complete")
}

104
model/custom_time.go Normal file

@@ -0,0 +1,104 @@
package model
import (
"database/sql/driver"
"encoding/json"
"fmt"
"time"
)
// CustomTime 自定义时间类型,用于JSON序列化为YYYY-MM-DD HH:mm:ss格式
type CustomTime struct {
time.Time
}
// MarshalJSON 自定义JSON序列化
func (ct CustomTime) MarshalJSON() ([]byte, error) {
if ct.Time.IsZero() {
return json.Marshal("")
}
return json.Marshal(ct.Time.Format("2006-01-02 15:04:05"))
}
// UnmarshalJSON 自定义JSON反序列化
// 支持格式:
// 1. 数字时间戳:10位秒级(1732435200)或13位毫秒级(1732435200000)
// 2. 字符串格式:秒级(2024-11-24 12:00:00)或毫秒级(2024-11-24 12:00:00.000)
// 3. 空字符串:转换为零值时间
func (ct *CustomTime) UnmarshalJSON(data []byte) error {
// 1. 尝试解析为数字时间戳
var timestamp int64
if err := json.Unmarshal(data, &timestamp); err == nil {
if timestamp == 0 {
ct.Time = time.Time{}
return nil
}
// 根据位数判断精度:13位为毫秒级,10位为秒级
if timestamp > 9999999999 {
ct.Time = time.UnixMilli(timestamp)
} else {
ct.Time = time.Unix(timestamp, 0)
}
return nil
}
// 2. 解析字符串格式
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
if str == "" {
ct.Time = time.Time{}
return nil
}
// 3. 使用本地时区解析时间,避免时区问题
// 尝试多种格式(按精度从高到低)
formats := []string{
"2006-01-02 15:04:05.000", // 毫秒格式
"2006-01-02 15:04:05", // 秒格式(现有)
}
var lastErr error
for _, format := range formats {
t, err := time.ParseInLocation(format, str, time.Local)
if err == nil {
ct.Time = t
return nil
}
lastErr = err
}
return lastErr
}
// Value 实现driver.Valuer接口,用于写入数据库
func (ct CustomTime) Value() (driver.Value, error) {
if ct.Time.IsZero() {
return nil, nil
}
return ct.Time, nil
}
// Scan 实现sql.Scanner接口,用于从数据库读取
func (ct *CustomTime) Scan(value interface{}) error {
if value == nil {
ct.Time = time.Time{}
return nil
}
if t, ok := value.(time.Time); ok {
ct.Time = t
return nil
}
return fmt.Errorf("无法扫描 %T 到 CustomTime", value)
}
// NewCustomTime 创建一个新的 CustomTime 实例
func NewCustomTime(t time.Time) CustomTime {
return CustomTime{Time: t}
}
// NowCustomTime 返回当前时间的 CustomTime 实例
func NowCustomTime() CustomTime {
return CustomTime{Time: time.Now()}
}

@@ -2,12 +2,13 @@ package util
import (
"ZhenTuLocalPassiveAdapter/dto"
"ZhenTuLocalPassiveAdapter/logger"
"bytes"
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"log"
"go.uber.org/zap"
"math/rand"
"os"
"os/exec"
@@ -36,7 +37,7 @@ func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool {
}
// 先尝试方法1
if !result {
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
logger.Warn("FFMPEG简易方法失败,尝试复杂方法转码")
// 不行再尝试方法二
result = runFfmpegForMultipleFile2(subCtx, task)
}
@@ -80,7 +81,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
result, err := convertMp4ToTs(subCtx, *file, tmpFile)
if err != nil {
log.Printf("转码出错: %v", err)
logger.Error("转码出错", zap.Error(err))
mu.Lock()
notOk = true
mu.Unlock()
@@ -115,7 +116,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
// 步骤三:删除临时文件
for _, file := range taskClone.Files {
if err := os.Remove(file.Url); err != nil {
log.Printf("删除临时文件失败: %v", err)
logger.Error("删除临时文件失败", zap.Error(err))
}
}
if result {
@@ -185,7 +186,7 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
stat, err := os.Stat(task.OutputFile)
if err != nil {
span.SetStatus(codes.Error, "文件不存在")
log.Printf("文件不存在:%s", task.OutputFile)
logger.Error("文件不存在", zap.String("outputFile", task.OutputFile))
return false
}
span.SetAttributes(attribute.String("file.name", task.OutputFile))
@@ -219,7 +220,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
defer span.End()
if fileList == nil || len(fileList) == 0 {
span.SetStatus(codes.Error, "无法根据要求找到对应录制片段")
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt)
logger.Error("无法根据要求找到对应录制片段",
zap.String("taskID", task.TaskID),
zap.String("beginTime", beginDt.Format("2006-01-02 15:04:05")),
zap.String("endTime", endDt.Format("2006-01-02 15:04:05")))
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
}
// 按照 Create 的值升序排序
@@ -238,7 +242,11 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 {
// 片段断开
span.SetStatus(codes.Error, "FFMPEG片段断开")
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
logger.Error("分析FFMPEG任务失败,文件片段中间断开超过2秒",
zap.String("taskID", task.TaskID),
zap.String("lastFile", lastFile.Name),
zap.String("currentFile", file.Name),
zap.Float64("gapSeconds", file.StartTime.Sub(lastFile.EndTime).Seconds()))
return nil, fmt.Errorf("片段断开")
}
lastFile = &file
@@ -248,7 +256,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
// 通过文件列表构造的任务仍然是缺失的
if fileList[len(fileList)-1].EndTime.Before(endDt) {
span.SetStatus(codes.Error, "FFMPEG片段断开")
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt)
logger.Error("分析FFMPEG任务失败,无法完整覆盖时间点",
zap.String("taskID", task.TaskID),
zap.String("lastFile", fileList[len(fileList)-1].Name),
zap.String("endTime", endDt.Format("2006-01-02 15:04:05")))
return nil, fmt.Errorf("片段断开")
}
@@ -326,7 +337,7 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "创建临时文件失败")
log.Printf("创建临时文件失败:%s", tmpFile)
logger.Error("创建临时文件失败", zap.String("tmpFile", tmpFile))
return false, err
}
defer os.Remove(tmpFile)
@@ -337,7 +348,7 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "写入临时文件失败")
log.Printf("写入临时文件失败:%s", tmpFile)
logger.Error("写入临时文件失败", zap.String("tmpFile", tmpFile))
return false, err
}
}
@@ -401,7 +412,7 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
defer func() {
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
}()
log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " "))
logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
var stderr bytes.Buffer
@@ -411,7 +422,9 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err
}
defer cmd.Process.Kill()
@@ -425,17 +438,21 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
case <-time.After(1 * time.Minute):
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " "))
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, fmt.Errorf("ffmpeg command timed out")
case err := <-done:
if err != nil {
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
logger.Error("FFMPEG执行命令失败",
zap.String("error", stderr.String()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return false, err
}
endTime := time.Now()
log.Printf("FFMPEG执行命令结束,耗费时间:【%dms】,命令:【%s】", endTime.Sub(startTime).Milliseconds(), strings.Join(ffmpegCmd, " "))
logger.Info("FFMPEG执行命令结束",
zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()),
zap.String("command", strings.Join(ffmpegCmd, " ")))
return true, nil
}
}
@@ -461,12 +478,11 @@ func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
span.SetStatus(codes.Error, "failed to get video duration")
return 0, fmt.Errorf("failed to get video duration: %w", err)
}
span.SetAttributes(attribute.String("ffmpeg.stdout", out.String()))
span.SetAttributes(attribute.String("ffprobe.stdout", out.String()))
durationStr := strings.TrimSpace(out.String())
duration, err := strconv.ParseFloat(durationStr, 64)
if err != nil {
span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "failed to parse video duration")
return 0, fmt.Errorf("failed to parse video duration: %w", err)
}
span.SetAttributes(attribute.Float64("video.duration", duration))

96
util/image.go Normal file

@@ -0,0 +1,96 @@
package util
import (
"bytes"
"fmt"
"image"
"image/draw"
"image/jpeg"
_ "image/png" // Register PNG decoder just in case
)
// GenerateThumbnail creates a thumbnail by cropping the original image.
// The thumbnail size is 1/2 of the original dimensions.
// The crop is centered on the provided coordinates (centerX, centerY), constrained to image bounds.
func GenerateThumbnail(srcData []byte, centerX, centerY int) ([]byte, error) {
// 1. Decode image
img, _, err := image.Decode(bytes.NewReader(srcData))
if err != nil {
return nil, fmt.Errorf("decode image failed: %v", err)
}
bounds := img.Bounds()
origW := bounds.Dx()
origH := bounds.Dy()
// 2. Calculate Target Dimensions (1/2 of original)
targetW := origW / 2
targetH := origH / 2
if targetW == 0 || targetH == 0 {
return nil, fmt.Errorf("image too small to generate half-size thumbnail")
}
// 3. Calculate Crop Origin (Top-Left)
// cropX = centerX - targetW / 2
cropX := centerX - targetW/2
cropY := centerY - targetH/2
// 4. Constrain to Bounds
if cropX < 0 {
cropX = 0
}
if cropY < 0 {
cropY = 0
}
if cropX+targetW > origW {
cropX = origW - targetW
}
if cropY+targetH > origH {
cropY = origH - targetH
}
// Double check after adjustment
if cropX < 0 {
cropX = 0
}
if cropY < 0 {
cropY = 0
}
// 5. Perform Crop
cropRect := image.Rect(cropX, cropY, cropX+targetW, cropY+targetH)
var dst image.Image
// Try to use SubImage if supported for performance
type SubImager interface {
SubImage(r image.Rectangle) image.Image
}
if sub, ok := img.(SubImager); ok {
dst = sub.SubImage(cropRect)
} else {
// Fallback: create new image and draw
rgba := image.NewRGBA(image.Rect(0, 0, targetW, targetH))
draw.Draw(rgba, rgba.Bounds(), img, cropRect.Min, draw.Src)
dst = rgba
}
// 6. Encode to JPEG
var buf bytes.Buffer
err = jpeg.Encode(&buf, dst, &jpeg.Options{Quality: 85})
if err != nil {
return nil, fmt.Errorf("encode thumbnail failed: %v", err)
}
return buf.Bytes(), nil
}
// Helper to process using LeftTop and RightBottom coordinates
func GenerateThumbnailFromCoords(srcData []byte, ltX, ltY, rbX, rbY int) ([]byte, error) {
// Calculate center based on the provided coordinates
centerX := (ltX + rbX) / 2
centerY := (ltY + rbY) / 2
return GenerateThumbnail(srcData, centerX, centerY)
}