You've already forked VptPassiveAdapter
Compare commits
15 Commits
cf3c518d13
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| b23794587f | |||
| a678829f59 | |||
| f10b68e487 | |||
| 67968abcf3 | |||
| 11f508342d | |||
| 4b1eb11986 | |||
| 84ccaa56de | |||
| 838430ee2f | |||
| 5dfe6d6356 | |||
| 509b829c5b | |||
| e6f93a4d37 | |||
| 2971c5f52d | |||
| 3d7c88de5f | |||
| f9256895b7 | |||
| 104930c413 |
40
api/http_client.go
Normal file
40
api/http_client.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// 通用HTTP客户端,带短超时时间,适用于API调用
|
||||
apiClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
DisableKeepAlives: false,
|
||||
},
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
// 文件上传专用HTTP客户端,超时时间较长
|
||||
uploadClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 50,
|
||||
MaxIdleConnsPerHost: 5,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
DisableKeepAlives: false,
|
||||
},
|
||||
Timeout: 60 * time.Second, // 文件上传需要更长的超时时间
|
||||
}
|
||||
)
|
||||
|
||||
// GetAPIClient 获取API调用客户端
|
||||
func GetAPIClient() *http.Client {
|
||||
return apiClient
|
||||
}
|
||||
|
||||
// GetUploadClient 获取文件上传客户端
|
||||
func GetUploadClient() *http.Client {
|
||||
return uploadClient
|
||||
}
|
||||
@@ -2,13 +2,14 @@ package api
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.uber.org/zap"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
)
|
||||
@@ -20,7 +21,7 @@ func UploadTaskFile(ctx context.Context, task dto.Task, file dto.FileObject) err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("开始上传文件, URL:【%s】\n", url)
|
||||
logger.Info("开始上传文件", zap.String("url", url))
|
||||
if err := OssUpload(subCtx, url, file.URL); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -58,8 +59,7 @@ func OssUpload(ctx context.Context, url, filePath string) error {
|
||||
}
|
||||
req.Header.Set("Content-Type", "video/mp4")
|
||||
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(fileBytes)))
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := GetUploadClient().Do(req)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "发送请求失败")
|
||||
|
||||
@@ -3,13 +3,13 @@ package api
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
func SyncTask() ([]dto.Task, error) {
|
||||
@@ -20,27 +20,24 @@ func SyncTask() ([]dto.Task, error) {
|
||||
}
|
||||
jsonData, err := json.Marshal(requestBody)
|
||||
if err != nil {
|
||||
log.Println("Error marshaling JSON:", err)
|
||||
logger.Error("序列化JSON失败", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
log.Println("Error creating request:", err)
|
||||
logger.Error("创建请求失败", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
client := &http.Client{
|
||||
Timeout: 5 * time.Second, // 设置超时时间为5秒
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := GetAPIClient().Do(req)
|
||||
if err != nil {
|
||||
log.Println("Error sending request:", err)
|
||||
logger.Error("发送请求失败", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Println("Error reading response body:", err)
|
||||
logger.Error("读取响应体失败", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -48,12 +45,13 @@ func SyncTask() ([]dto.Task, error) {
|
||||
var response dto.TaskListResponse
|
||||
err = json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
log.Println("->:", string(body))
|
||||
log.Println("Error unmarshaling response body:", err)
|
||||
logger.Error("解析响应体失败",
|
||||
zap.String("responseBody", string(body)),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if response.Code != 200 {
|
||||
log.Println("Error response code:", response.Code)
|
||||
logger.Error("响应错误码", zap.Int("code", response.Code))
|
||||
return nil, fmt.Errorf(response.Msg)
|
||||
}
|
||||
return response.Data, nil
|
||||
|
||||
@@ -3,13 +3,14 @@ package api
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.uber.org/zap"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
@@ -23,15 +24,14 @@ func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) {
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "创建请求失败")
|
||||
log.Println("Error creating request:", err)
|
||||
logger.Error("创建请求失败", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := GetAPIClient().Do(req)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "发送请求失败")
|
||||
log.Println("Error sending request:", err)
|
||||
logger.Error("发送请求失败", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
span.SetAttributes(attribute.String("http.status", resp.Status))
|
||||
@@ -41,7 +41,7 @@ func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) {
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "读取响应体失败")
|
||||
log.Println("Error reading response body:", err)
|
||||
logger.Error("读取响应体失败", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
return string(body), nil
|
||||
@@ -58,16 +58,15 @@ func ReportTaskFailure(ctx context.Context, taskId string) bool {
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "创建请求失败")
|
||||
log.Println("Error creating request:", err)
|
||||
logger.Error("创建请求失败", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := GetAPIClient().Do(req)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "发送请求失败")
|
||||
log.Println("Error sending request:", err)
|
||||
logger.Error("发送请求失败", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@@ -94,7 +93,7 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "序列化JSON失败")
|
||||
log.Println("Error marshaling JSON:", err)
|
||||
logger.Error("序列化JSON失败", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -102,17 +101,16 @@ func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "创建请求失败")
|
||||
log.Println("Error creating request:", err)
|
||||
logger.Error("创建请求失败", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
resp, err := GetAPIClient().Do(req)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "发送请求失败")
|
||||
log.Println("Error sending request:", err)
|
||||
logger.Error("发送请求失败", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
215
api/viid_client.go
Normal file
215
api/viid_client.go
Normal file
@@ -0,0 +1,215 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// VIID Request/Response Structures
|
||||
|
||||
type DeviceIdObject struct {
|
||||
DeviceID string `json:"DeviceID"`
|
||||
}
|
||||
|
||||
type RegisterRequest struct {
|
||||
RegisterObject DeviceIdObject `json:"RegisterObject"`
|
||||
}
|
||||
|
||||
type KeepaliveRequest struct {
|
||||
KeepaliveObject DeviceIdObject `json:"KeepaliveObject"`
|
||||
}
|
||||
|
||||
type UnRegisterRequest struct {
|
||||
UnRegisterObject DeviceIdObject `json:"UnRegisterObject"`
|
||||
}
|
||||
|
||||
type VIIDBaseResponse struct {
|
||||
ResponseStatusObject ResponseStatusObject `json:"ResponseStatusObject"`
|
||||
}
|
||||
|
||||
type ResponseStatusObject struct {
|
||||
ID string `json:"Id"`
|
||||
RequestURL string `json:"RequestURL"`
|
||||
StatusCode string `json:"StatusCode"`
|
||||
StatusString string `json:"StatusString"`
|
||||
LocalTime string `json:"LocalTime"`
|
||||
}
|
||||
|
||||
type SystemTimeResponse struct {
|
||||
ResponseStatusObject struct {
|
||||
LocalTime string `json:"LocalTime"` // yyyyMMddHHmmss
|
||||
} `json:"ResponseStatusObject"`
|
||||
}
|
||||
|
||||
// VIID Client Methods
|
||||
|
||||
func ProxyRequest(ctx context.Context, method, path string, body io.Reader, header http.Header) (*http.Response, error) {
|
||||
url := fmt.Sprintf("%s%s", config.Config.Viid.ServerUrl, path)
|
||||
req, err := http.NewRequestWithContext(ctx, method, url, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Copy headers
|
||||
for k, v := range header {
|
||||
req.Header[k] = v
|
||||
}
|
||||
|
||||
return GetAPIClient().Do(req)
|
||||
}
|
||||
|
||||
// Upload Proxy Structures
|
||||
|
||||
type UploadConfig struct {
|
||||
TaskID int64 `json:"taskId"`
|
||||
FaceUploadURL string `json:"faceUploadUrl"`
|
||||
ThumbnailUploadURL string `json:"thumbnailUploadUrl"`
|
||||
SourceUploadURL string `json:"sourceUploadUrl"`
|
||||
ExpiresAt time.Time `json:"expiresAt"`
|
||||
}
|
||||
|
||||
type ProxyResponse struct {
|
||||
Code int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
Success bool `json:"success"`
|
||||
Data *UploadConfig `json:"data"` // Data can be UploadConfig or string depending on endpoint
|
||||
}
|
||||
|
||||
type FacePositionInfo struct {
|
||||
LeftTopX int `json:"leftTopX"`
|
||||
LeftTopY int `json:"leftTopY"`
|
||||
RightBtmX int `json:"rightBtmX"`
|
||||
RightBtmY int `json:"rightBtmY"`
|
||||
ImgWidth int `json:"imgWidth"`
|
||||
ImgHeight int `json:"imgHeight"`
|
||||
ShotTime string `json:"shotTime"` // yyyyMMddHHmmss
|
||||
}
|
||||
|
||||
type SubmitResultRequest struct {
|
||||
TaskID int64 `json:"taskId"`
|
||||
FacePosition FacePositionInfo `json:"facePosition"`
|
||||
}
|
||||
|
||||
type SubmitFailureRequest struct {
|
||||
TaskID int64 `json:"taskId"`
|
||||
ErrorCode string `json:"errorCode"`
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
}
|
||||
|
||||
// Upload Proxy Methods
|
||||
|
||||
func GetUploadConfig(ctx context.Context, scenicId int64, deviceNo string) (*UploadConfig, error) {
|
||||
url := fmt.Sprintf("%s/proxy/VIID/upload-config?scenicId=%d&deviceNo=%s", config.Config.Viid.ServerUrl, scenicId, deviceNo)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := GetAPIClient().Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var result ProxyResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !result.Success {
|
||||
return nil, fmt.Errorf("get upload config failed: %s", result.Message)
|
||||
}
|
||||
|
||||
return result.Data, nil
|
||||
}
|
||||
|
||||
func SubmitResult(ctx context.Context, taskID int64, facePos FacePositionInfo) error {
|
||||
url := fmt.Sprintf("%s/proxy/VIID/submit-result", config.Config.Viid.ServerUrl)
|
||||
reqData := SubmitResultRequest{
|
||||
TaskID: taskID,
|
||||
FacePosition: facePos,
|
||||
}
|
||||
|
||||
jsonData, _ := json.Marshal(reqData)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := GetAPIClient().Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var result struct {
|
||||
Code int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
Success bool `json:"success"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !result.Success {
|
||||
return fmt.Errorf("submit result failed: %s", result.Message)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SubmitFailure(ctx context.Context, taskID int64, errorCode, errorMessage string) error {
|
||||
url := fmt.Sprintf("%s/proxy/VIID/submit-failure", config.Config.Viid.ServerUrl)
|
||||
reqData := SubmitFailureRequest{
|
||||
TaskID: taskID,
|
||||
ErrorCode: errorCode,
|
||||
ErrorMessage: errorMessage,
|
||||
}
|
||||
|
||||
jsonData, _ := json.Marshal(reqData)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := GetAPIClient().Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func UploadFileToOSS(ctx context.Context, uploadUrl string, data []byte, contentType string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, "PUT", uploadUrl, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data)))
|
||||
req.Header.Set("Content-Type", contentType)
|
||||
|
||||
resp, err := GetUploadClient().Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
logger.Error("oss upload failed", zap.String("body", string(body)))
|
||||
return fmt.Errorf("oss upload failed: %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
248
api/viid_handler.go
Normal file
248
api/viid_handler.go
Normal file
@@ -0,0 +1,248 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"ZhenTuLocalPassiveAdapter/util"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Request Structs for Handler
|
||||
|
||||
type FaceUploadRequest struct {
|
||||
FaceListObject FaceListObject `json:"FaceListObject"`
|
||||
}
|
||||
|
||||
type FaceListObject struct {
|
||||
FaceObject []FaceObject `json:"FaceObject"`
|
||||
}
|
||||
|
||||
type FaceObject struct {
|
||||
FaceID string `json:"FaceID"`
|
||||
DeviceID string `json:"DeviceID"`
|
||||
ShotTime string `json:"ShotTime"`
|
||||
FaceAppearTime string `json:"FaceAppearTime"`
|
||||
SubImageList SubImageList `json:"SubImageList"`
|
||||
// Position info
|
||||
LeftTopX *int `json:"LeftTopX,omitempty"`
|
||||
LeftTopY *int `json:"LeftTopY,omitempty"`
|
||||
RightBtmX *int `json:"RightBtmX,omitempty"`
|
||||
RightBtmY *int `json:"RightBtmY,omitempty"`
|
||||
}
|
||||
|
||||
type SubImageList struct {
|
||||
SubImageInfoObject []SubImageInfoObject `json:"SubImageInfoObject"`
|
||||
}
|
||||
|
||||
type SubImageInfoObject struct {
|
||||
ImageID string `json:"ImageID"`
|
||||
Type string `json:"Type"` // "11"=Face, "14"=Source
|
||||
FileFormat string `json:"FileFormat"`
|
||||
Data string `json:"Data"` // Base64
|
||||
Width *int `json:"Width,omitempty"`
|
||||
Height *int `json:"Height,omitempty"`
|
||||
}
|
||||
|
||||
// Register Routes
|
||||
func RegisterVIIDRoutes(r *gin.Engine) {
|
||||
viid := r.Group("/VIID")
|
||||
{
|
||||
sys := viid.Group("/System")
|
||||
{
|
||||
sys.POST("/Register", HandleRegister)
|
||||
sys.POST("/Keepalive", HandleKeepalive)
|
||||
sys.POST("/UnRegister", HandleUnRegister)
|
||||
sys.GET("/Time", HandleSystemTime)
|
||||
}
|
||||
viid.POST("/Faces", HandleUploadFaces)
|
||||
}
|
||||
}
|
||||
|
||||
// HTTP Handlers
|
||||
|
||||
func HandleRegister(c *gin.Context) {
|
||||
proxyHelper(c, "/VIID/System/Register")
|
||||
}
|
||||
|
||||
func HandleKeepalive(c *gin.Context) {
|
||||
proxyHelper(c, "/VIID/System/Keepalive")
|
||||
}
|
||||
|
||||
func HandleUnRegister(c *gin.Context) {
|
||||
proxyHelper(c, "/VIID/System/UnRegister")
|
||||
}
|
||||
|
||||
func proxyHelper(c *gin.Context, path string) {
|
||||
resp, err := ProxyRequest(c.Request.Context(), c.Request.Method, path, c.Request.Body, c.Request.Header)
|
||||
if err != nil {
|
||||
logger.Error("Proxy request failed", zap.String("path", path), zap.Error(err))
|
||||
c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Copy Headers
|
||||
for k, v := range resp.Header {
|
||||
c.Writer.Header()[k] = v
|
||||
}
|
||||
c.Status(resp.StatusCode)
|
||||
io.Copy(c.Writer, resp.Body)
|
||||
}
|
||||
|
||||
func HandleSystemTime(c *gin.Context) {
|
||||
now := time.Now()
|
||||
timeStr := now.Format("20060102150405") // VIID format
|
||||
|
||||
// Determine TimeZone string (e.g., "Asia/Shanghai")
|
||||
// Simple fixed string or system local
|
||||
timeZone := "Local"
|
||||
loc, err := time.LoadLocation("Local")
|
||||
if err == nil {
|
||||
timeZone = loc.String()
|
||||
}
|
||||
|
||||
resp := map[string]interface{}{
|
||||
"SystemTimeObject": map[string]string{
|
||||
"VIIDServerID": "00000000000000000001", // Placeholder
|
||||
"TimeMode": "2", // 2: Atomic Clock / NTP
|
||||
"LocalTime": timeStr,
|
||||
"TimeZone": timeZone,
|
||||
},
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func HandleUploadFaces(c *gin.Context) {
|
||||
var req FaceUploadRequest
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
logger.Error("Invalid JSON in UploadFaces", zap.Error(err))
|
||||
sendErrorResponse(c, "/VIID/Faces", "", "Invalid JSON")
|
||||
return
|
||||
}
|
||||
|
||||
var lastFaceID string
|
||||
|
||||
for _, face := range req.FaceListObject.FaceObject {
|
||||
lastFaceID = face.FaceID
|
||||
deviceID := face.DeviceID
|
||||
|
||||
// 1. Parse Images
|
||||
var faceImg, srcImg []byte
|
||||
var imgWidth, imgHeight int
|
||||
|
||||
for _, subImg := range face.SubImageList.SubImageInfoObject {
|
||||
data, err := base64.StdEncoding.DecodeString(subImg.Data)
|
||||
if err != nil {
|
||||
logger.Error("Base64 decode failed", zap.String("faceId", face.FaceID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if subImg.Type == "11" { // Face
|
||||
faceImg = data
|
||||
} else if subImg.Type == "14" { // Source
|
||||
srcImg = data
|
||||
if subImg.Width != nil {
|
||||
imgWidth = *subImg.Width
|
||||
}
|
||||
if subImg.Height != nil {
|
||||
imgHeight = *subImg.Height
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Construct Position Info
|
||||
pos := FacePositionInfo{
|
||||
ImgWidth: imgWidth,
|
||||
ImgHeight: imgHeight,
|
||||
ShotTime: parseShotTime(face.ShotTime, face.FaceAppearTime),
|
||||
}
|
||||
if face.LeftTopX != nil {
|
||||
pos.LeftTopX = *face.LeftTopX
|
||||
}
|
||||
if face.LeftTopY != nil {
|
||||
pos.LeftTopY = *face.LeftTopY
|
||||
}
|
||||
if face.RightBtmX != nil {
|
||||
pos.RightBtmX = *face.RightBtmX
|
||||
}
|
||||
if face.RightBtmY != nil {
|
||||
pos.RightBtmY = *face.RightBtmY
|
||||
}
|
||||
|
||||
// 3. Generate Thumbnail (if Source Image and Coordinates exist)
|
||||
var thumbImg []byte
|
||||
if len(srcImg) > 0 && pos.LeftTopX > 0 && pos.LeftTopY > 0 { // Basic validation
|
||||
// Use Source Image to generate thumbnail
|
||||
// The thumbnail is 1/2 original size, centered on face
|
||||
var err error
|
||||
thumbImg, err = util.GenerateThumbnailFromCoords(srcImg, pos.LeftTopX, pos.LeftTopY, pos.RightBtmX, pos.RightBtmY)
|
||||
if err != nil {
|
||||
logger.Error("Failed to generate thumbnail", zap.String("faceId", face.FaceID), zap.Error(err))
|
||||
// Continue without thumbnail? or fail?
|
||||
// Usually continue, but log error.
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Execute Upload Flow
|
||||
scenicId := config.Config.Viid.ScenicId
|
||||
|
||||
go func(fID, dID string, fData, tData, sData []byte, p FacePositionInfo) {
|
||||
// Create a detached context with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
if err := UploadFaceData(ctx, scenicId, dID, fData, tData, sData, p); err != nil {
|
||||
logger.Error("Failed to process face upload", zap.String("faceId", fID), zap.Error(err))
|
||||
}
|
||||
}(face.FaceID, deviceID, faceImg, thumbImg, srcImg, pos)
|
||||
}
|
||||
|
||||
// Respond Success
|
||||
sendSuccessResponse(c, "/VIID/Faces", lastFaceID, "OK")
|
||||
}
|
||||
|
||||
// Helpers
|
||||
|
||||
func sendErrorResponse(c *gin.Context, url, id, msg string) {
|
||||
resp := VIIDBaseResponse{
|
||||
ResponseStatusObject: ResponseStatusObject{
|
||||
ID: id,
|
||||
RequestURL: url,
|
||||
StatusCode: "1",
|
||||
StatusString: msg,
|
||||
LocalTime: time.Now().Format("20060102150405"),
|
||||
},
|
||||
}
|
||||
c.JSON(http.StatusBadRequest, resp)
|
||||
}
|
||||
|
||||
func sendSuccessResponse(c *gin.Context, url, id, msg string) {
|
||||
resp := VIIDBaseResponse{
|
||||
ResponseStatusObject: ResponseStatusObject{
|
||||
ID: id,
|
||||
RequestURL: url,
|
||||
StatusCode: "0",
|
||||
StatusString: msg,
|
||||
LocalTime: time.Now().Format("20060102150405"),
|
||||
},
|
||||
}
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func parseShotTime(t1, t2 string) string {
|
||||
if t1 != "" {
|
||||
return t1
|
||||
}
|
||||
if t2 != "" {
|
||||
return t2
|
||||
}
|
||||
return time.Now().Format("20060102150405")
|
||||
}
|
||||
70
api/viid_upload.go
Normal file
70
api/viid_upload.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"context"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// UploadFaceData implements the coordinated upload flow: Get Config -> Upload -> Notify
|
||||
func UploadFaceData(ctx context.Context, scenicId int64, deviceNo string, faceImg, thumbImg, srcImg []byte, facePos FacePositionInfo) error {
|
||||
// 1. Get Upload Config
|
||||
uploadConfig, err := GetUploadConfig(ctx, scenicId, deviceNo)
|
||||
if err != nil {
|
||||
logger.Error("获取上传配置失败", zap.String("deviceNo", deviceNo), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("获取到VIID任务ID", zap.Int64("taskId", uploadConfig.TaskID))
|
||||
|
||||
// 2. Parallel Upload to OSS
|
||||
g, subCtx := errgroup.WithContext(ctx)
|
||||
|
||||
// Upload Face Image
|
||||
g.Go(func() error {
|
||||
if len(faceImg) > 0 {
|
||||
if err := UploadFileToOSS(subCtx, uploadConfig.FaceUploadURL, faceImg, "image/jpeg"); err != nil {
|
||||
return fmt.Errorf("upload face image failed: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Upload Thumbnail Image
|
||||
g.Go(func() error {
|
||||
if len(thumbImg) > 0 {
|
||||
if err := UploadFileToOSS(subCtx, uploadConfig.ThumbnailUploadURL, thumbImg, "image/jpeg"); err != nil {
|
||||
return fmt.Errorf("upload thumbnail image failed: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Upload Source Image
|
||||
g.Go(func() error {
|
||||
if len(srcImg) > 0 {
|
||||
if err := UploadFileToOSS(subCtx, uploadConfig.SourceUploadURL, srcImg, "image/jpeg"); err != nil {
|
||||
return fmt.Errorf("upload source image failed: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
// Report failure
|
||||
SubmitFailure(ctx, uploadConfig.TaskID, "UPLOAD_FAILED", err.Error())
|
||||
logger.Error("文件上传失败", zap.Int64("taskId", uploadConfig.TaskID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// 3. Submit Result
|
||||
if err := SubmitResult(ctx, uploadConfig.TaskID, facePos); err != nil {
|
||||
logger.Error("提交结果失败", zap.Int64("taskId", uploadConfig.TaskID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("VIID任务完成", zap.Int64("taskId", uploadConfig.TaskID))
|
||||
return nil
|
||||
}
|
||||
23
config.yaml
23
config.yaml
@@ -1,24 +1,27 @@
|
||||
api:
|
||||
# baseUrl: "https://zhentuai.com/vpt/v1/scenic/3946669713328836608"
|
||||
baseUrl: "http://127.0.0.1:8030/vpt/v1/scenic/3946669713328836608"
|
||||
baseUrl: "https://zhentuai.com/vpt/v1/scenic/3975985126059413504"
|
||||
record:
|
||||
storage:
|
||||
path: "/root/opt/"
|
||||
type: "s3"
|
||||
s3:
|
||||
region: us-east-1
|
||||
endpoint: http://192.168.55.101:9000
|
||||
endpoint: http://127.0.0.1:9000
|
||||
bucket: opt
|
||||
prefix:
|
||||
akId: 5vzfDiMztKO6VLvygoeX
|
||||
akSec: Ot77u2kdVTm8zfQgExFrsm7xlGecxsiR6jk1idXM
|
||||
duration: 60
|
||||
akId: Idi2MBaWH2F0LFIWGdDY
|
||||
akSec: Idi2MBaWH2F0LFIWGdDY
|
||||
duration: 30
|
||||
devices:
|
||||
- deviceNo: "34020000001322200001"
|
||||
name: "192.168.55.201"
|
||||
path: "/root/opt/34020000001322200001/"
|
||||
- deviceNo: "44020000001322500001"
|
||||
name: "ppda-010268-zymyj"
|
||||
fileName:
|
||||
timeSplit: "_"
|
||||
dateSeparator: ""
|
||||
fileExt: "ts"
|
||||
unFinExt: "ts"
|
||||
unFinExt: "ts"
|
||||
viid:
|
||||
enabled: true
|
||||
serverUrl: "http://127.0.0.1:18083"
|
||||
scenicId: 3975985126059413504
|
||||
port: 8080
|
||||
|
||||
@@ -10,9 +10,10 @@ type RecordConfig struct {
|
||||
}
|
||||
|
||||
type StorageConfig struct {
|
||||
Type string `mapstructure:"type"`
|
||||
Path string `mapstructure:"path"`
|
||||
S3 S3Config `mapstructure:"s3"`
|
||||
Type string `mapstructure:"type"`
|
||||
Path string `mapstructure:"path"`
|
||||
S3 S3Config `mapstructure:"s3"`
|
||||
AliOSS AliOSSConfig `mapstructure:"aliOss"`
|
||||
}
|
||||
|
||||
type S3Config struct {
|
||||
@@ -24,6 +25,14 @@ type S3Config struct {
|
||||
AkSec string `mapstructure:"akSec"`
|
||||
}
|
||||
|
||||
type AliOSSConfig struct {
|
||||
Endpoint string `mapstructure:"endpoint"`
|
||||
Bucket string `mapstructure:"bucket"`
|
||||
Prefix string `mapstructure:"prefix"`
|
||||
AccessKeyId string `mapstructure:"accessKeyId"`
|
||||
AccessKeySecret string `mapstructure:"accessKeySecret"`
|
||||
}
|
||||
|
||||
type DeviceMapping struct {
|
||||
DeviceNo string `mapstructure:"deviceNo" json:"deviceNo"`
|
||||
Name string `mapstructure:"name" json:"name"`
|
||||
@@ -36,9 +45,17 @@ type FileNameConfig struct {
|
||||
UnfinishedFileExt string `mapstructure:"unFinExt"`
|
||||
}
|
||||
|
||||
type ViidConfig struct {
|
||||
Enabled bool `mapstructure:"enabled"`
|
||||
ServerUrl string `mapstructure:"serverUrl"`
|
||||
ScenicId int64 `mapstructure:"scenicId"`
|
||||
Port int `mapstructure:"port"`
|
||||
}
|
||||
|
||||
type MainConfig struct {
|
||||
Api ApiConfig `mapstructure:"api"`
|
||||
Record RecordConfig `mapstructure:"record"`
|
||||
Devices []DeviceMapping `mapstructure:"devices"`
|
||||
FileName FileNameConfig `mapstructure:"fileName"`
|
||||
Viid ViidConfig `mapstructure:"viid"`
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"github.com/spf13/viper"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var Config MainConfig
|
||||
@@ -14,13 +15,13 @@ func LoadConfig() error {
|
||||
|
||||
// 读取配置文件
|
||||
if err := viper.ReadInConfig(); err != nil {
|
||||
log.Fatalf("Error reading config file, %s", err)
|
||||
logger.Fatal("读取配置文件失败", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// 反序列化配置到结构体
|
||||
if err := viper.Unmarshal(&Config); err != nil {
|
||||
log.Fatalf("Unable to decode into struct, %v", err)
|
||||
logger.Fatal("解析配置失败", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
11
core/task.go
11
core/task.go
@@ -4,13 +4,14 @@ import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/fs"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"ZhenTuLocalPassiveAdapter/util"
|
||||
"context"
|
||||
"fmt"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
"path"
|
||||
)
|
||||
|
||||
@@ -32,7 +33,9 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "获取文件列表失败")
|
||||
log.Printf("获取文件列表失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
|
||||
logger.Error("获取文件列表失败",
|
||||
zap.String("deviceNo", device.DeviceNo),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
files := util.FilterAndSortFiles(subCtx, fileList, task.StartTime, task.EndTime)
|
||||
@@ -45,7 +48,9 @@ func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "文件片段检查失败")
|
||||
log.Printf("文件片段检查失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
|
||||
logger.Error("文件片段检查失败",
|
||||
zap.String("deviceNo", device.DeviceNo),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
ok := util.RunFfmpegTask(subCtx, constructTask)
|
||||
|
||||
@@ -16,6 +16,10 @@ func GetAdapter() Adapter {
|
||||
return &S3Adapter{
|
||||
StorageConfig: config.Config.Record.Storage,
|
||||
}
|
||||
} else if config.Config.Record.Storage.Type == "alioss" {
|
||||
return &AliOSSAdapter{
|
||||
StorageConfig: config.Config.Record.Storage,
|
||||
}
|
||||
} else {
|
||||
return &LocalAdapter{
|
||||
config.Config.Record.Storage,
|
||||
|
||||
203
fs/ali_adapter.go
Normal file
203
fs/ali_adapter.go
Normal file
@@ -0,0 +1,203 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"ZhenTuLocalPassiveAdapter/util"
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aliyun/aliyun-oss-go-sdk/oss"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var aliOssCache sync.Map
|
||||
|
||||
type AliOSSAdapter struct {
|
||||
StorageConfig config.StorageConfig
|
||||
ossClient *oss.Client
|
||||
}
|
||||
|
||||
func (a *AliOSSAdapter) getClient() (*oss.Client, error) {
|
||||
if a.ossClient == nil {
|
||||
client, err := oss.New(
|
||||
a.StorageConfig.AliOSS.Endpoint,
|
||||
a.StorageConfig.AliOSS.AccessKeyId,
|
||||
a.StorageConfig.AliOSS.AccessKeySecret,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建阿里云OSS客户端失败: %w", err)
|
||||
}
|
||||
a.ossClient = client
|
||||
}
|
||||
return a.ossClient, nil
|
||||
}
|
||||
|
||||
func (a *AliOSSAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) {
|
||||
_, span := tracer.Start(ctx, "GetFileList_alioss")
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(attribute.String("path", dirPath))
|
||||
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
|
||||
if a.StorageConfig.AliOSS.Bucket == "" {
|
||||
span.SetAttributes(attribute.String("error", "未配置阿里云OSS存储桶"))
|
||||
span.SetStatus(codes.Error, "未配置阿里云OSS存储桶")
|
||||
return nil, fmt.Errorf("未配置阿里云OSS存储桶")
|
||||
}
|
||||
|
||||
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
|
||||
if cachedInterface, ok := aliOssCache.Load(cacheKey); ok {
|
||||
cachedItem := cachedInterface.(cacheItem)
|
||||
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
|
||||
if time.Now().Before(cachedItem.expires) {
|
||||
logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey))
|
||||
span.SetAttributes(attribute.Bool("cache.hit", true))
|
||||
return cachedItem.data, nil
|
||||
}
|
||||
}
|
||||
|
||||
mutexKey := fmt.Sprintf("lock_%s", cacheKey)
|
||||
mutex, _ := aliOssCache.LoadOrStore(mutexKey, &sync.Mutex{})
|
||||
lock := mutex.(*sync.Mutex)
|
||||
defer func() {
|
||||
// 解锁后删除锁(避免内存泄漏)
|
||||
aliOssCache.Delete(mutexKey)
|
||||
lock.Unlock()
|
||||
}()
|
||||
lock.Lock()
|
||||
|
||||
if cachedInterface, ok := aliOssCache.Load(cacheKey); ok {
|
||||
cachedItem := cachedInterface.(cacheItem)
|
||||
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
|
||||
if time.Now().Before(cachedItem.expires) {
|
||||
logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey))
|
||||
span.SetAttributes(attribute.Bool("aliOssCache.hit", true))
|
||||
return cachedItem.data, nil
|
||||
}
|
||||
}
|
||||
|
||||
client, err := a.getClient()
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "创建阿里云OSS客户端失败")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bucket, err := client.Bucket(a.StorageConfig.AliOSS.Bucket)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "获取存储桶失败")
|
||||
return nil, fmt.Errorf("获取存储桶失败: %w", err)
|
||||
}
|
||||
|
||||
var fileList []dto.File
|
||||
prefix := path.Join(a.StorageConfig.AliOSS.Prefix, dirPath)
|
||||
marker := ""
|
||||
|
||||
for {
|
||||
lsRes, err := bucket.ListObjects(
|
||||
oss.Prefix(prefix),
|
||||
oss.Marker(marker),
|
||||
oss.MaxKeys(1000),
|
||||
)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "文件列表读取失败")
|
||||
return nil, fmt.Errorf("文件列表读取失败: %w", err)
|
||||
}
|
||||
|
||||
for _, object := range lsRes.Objects {
|
||||
key := object.Key
|
||||
if !util.IsVideoFile(path.Base(key)) {
|
||||
continue
|
||||
}
|
||||
startTime, stopTime, err := util.ParseStartStopTime(path.Base(key), relDt)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if stopTime.IsZero() {
|
||||
stopTime = startTime
|
||||
}
|
||||
if startTime.Equal(stopTime) {
|
||||
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
|
||||
}
|
||||
|
||||
// 生成预签名URL(有效期10分钟)
|
||||
signedURL, err := bucket.SignURL(key, oss.HTTPGet, 600)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "生成预签名URL失败")
|
||||
logger.Error("生成预签名URL失败", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
fileList = append(fileList, dto.File{
|
||||
BasePath: a.StorageConfig.AliOSS.Bucket,
|
||||
Name: path.Base(key),
|
||||
Path: path.Dir(key),
|
||||
Url: signedURL,
|
||||
StartTime: startTime,
|
||||
EndTime: stopTime,
|
||||
})
|
||||
}
|
||||
|
||||
if !lsRes.IsTruncated {
|
||||
break
|
||||
}
|
||||
marker = lsRes.NextMarker
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.Int("file.count", len(fileList)))
|
||||
sort.Slice(fileList, func(i, j int) bool {
|
||||
return fileList[i].StartTime.Before(fileList[j].StartTime)
|
||||
})
|
||||
span.SetStatus(codes.Ok, "文件读取成功")
|
||||
|
||||
cacheItem := cacheItem{
|
||||
data: fileList,
|
||||
expires: time.Now().Add(30 * time.Second),
|
||||
}
|
||||
aliOssCache.Store(cacheKey, cacheItem)
|
||||
logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey))
|
||||
|
||||
return fileList, nil
|
||||
}
|
||||
|
||||
// 添加定时清理缓存的初始化函数
|
||||
func init() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
cleanupAliOssCache()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 添加缓存清理函数
|
||||
func cleanupAliOssCache() {
|
||||
var keysToDelete []interface{}
|
||||
aliOssCache.Range(func(key, value interface{}) bool {
|
||||
item, ok := value.(cacheItem)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
if time.Now().After(item.expires) {
|
||||
keysToDelete = append(keysToDelete, key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
for _, key := range keysToDelete {
|
||||
aliOssCache.Delete(key)
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,9 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
|
||||
span.SetStatus(codes.Error, "未配置存储路径")
|
||||
return nil, fmt.Errorf("未配置存储路径")
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.String("path", dirPath))
|
||||
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
|
||||
// 读取文件夹下目录
|
||||
files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath))
|
||||
if err != nil {
|
||||
@@ -51,7 +54,10 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if startTime.Equal(stopTime) || stopTime.IsZero() {
|
||||
if stopTime.IsZero() {
|
||||
stopTime = startTime
|
||||
}
|
||||
if startTime.Equal(stopTime) {
|
||||
// 如果文件名没有时间戳,则认为该文件是未录制完成的
|
||||
// 尝试读取一下视频信息
|
||||
duration, err := util.GetVideoDuration(subCtx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))
|
||||
|
||||
@@ -3,13 +3,15 @@ package fs
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"ZhenTuLocalPassiveAdapter/util"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
"path"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
@@ -18,6 +20,8 @@ import (
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
)
|
||||
|
||||
var s3Cache sync.Map
|
||||
|
||||
type S3Adapter struct {
|
||||
StorageConfig config.StorageConfig
|
||||
s3Client *s3.Client
|
||||
@@ -49,15 +53,49 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
|
||||
_, span := tracer.Start(ctx, "GetFileList_s3")
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(attribute.String("path", dirPath))
|
||||
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
|
||||
if s.StorageConfig.S3.Bucket == "" {
|
||||
span.SetAttributes(attribute.String("error", "未配置S3存储桶"))
|
||||
span.SetStatus(codes.Error, "未配置S3存储桶")
|
||||
return nil, fmt.Errorf("未配置S3存储桶")
|
||||
}
|
||||
|
||||
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
|
||||
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
|
||||
cachedItem := cachedInterface.(cacheItem)
|
||||
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
|
||||
if time.Now().Before(cachedItem.expires) {
|
||||
logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey))
|
||||
span.SetAttributes(attribute.Bool("cache.hit", true))
|
||||
return cachedItem.data, nil
|
||||
}
|
||||
}
|
||||
|
||||
mutexKey := fmt.Sprintf("lock_%s", cacheKey)
|
||||
mutex, _ := s3Cache.LoadOrStore(mutexKey, &sync.Mutex{})
|
||||
lock := mutex.(*sync.Mutex)
|
||||
defer func() {
|
||||
// 解锁后删除锁(避免内存泄漏)
|
||||
s3Cache.Delete(mutexKey)
|
||||
lock.Unlock()
|
||||
}()
|
||||
lock.Lock()
|
||||
|
||||
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
|
||||
cachedItem := cachedInterface.(cacheItem)
|
||||
logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now())))
|
||||
if time.Now().Before(cachedItem.expires) {
|
||||
logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey))
|
||||
span.SetAttributes(attribute.Bool("s3Cache.hit", true))
|
||||
return cachedItem.data, nil
|
||||
}
|
||||
}
|
||||
|
||||
listObjectsInput := &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s.StorageConfig.S3.Bucket),
|
||||
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
|
||||
Bucket: aws.String(s.StorageConfig.S3.Bucket),
|
||||
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
|
||||
MaxKeys: aws.Int32(1000),
|
||||
}
|
||||
|
||||
client, err := s.getClient()
|
||||
@@ -91,7 +129,10 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if startTime.Equal(stopTime) || stopTime.IsZero() {
|
||||
if stopTime.IsZero() {
|
||||
stopTime = startTime
|
||||
}
|
||||
if startTime.Equal(stopTime) {
|
||||
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
|
||||
}
|
||||
presignClient := s3.NewPresignClient(client)
|
||||
@@ -104,7 +145,7 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "生成预签名URL失败")
|
||||
log.Println("Error presigning GetObject request:", err)
|
||||
logger.Error("生成预签名URL失败", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fileList = append(fileList, dto.File{
|
||||
@@ -128,5 +169,47 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
|
||||
return fileList[i].StartTime.Before(fileList[j].StartTime)
|
||||
})
|
||||
span.SetStatus(codes.Ok, "文件读取成功")
|
||||
|
||||
cacheItem := cacheItem{
|
||||
data: fileList,
|
||||
expires: time.Now().Add(30 * time.Second),
|
||||
}
|
||||
s3Cache.Store(cacheKey, cacheItem)
|
||||
logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey))
|
||||
|
||||
return fileList, nil
|
||||
}
|
||||
|
||||
type cacheItem struct {
|
||||
data []dto.File
|
||||
expires time.Time
|
||||
}
|
||||
|
||||
// 添加定时清理缓存的初始化函数
|
||||
func init() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
cleanupCache()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 添加缓存清理函数
|
||||
func cleanupCache() {
|
||||
var keysToDelete []interface{}
|
||||
s3Cache.Range(func(key, value interface{}) bool {
|
||||
item := value.(cacheItem)
|
||||
if time.Now().After(item.expires) {
|
||||
keysToDelete = append(keysToDelete, key)
|
||||
}
|
||||
return true
|
||||
})
|
||||
for _, key := range keysToDelete {
|
||||
s3Cache.Delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
53
go.mod
53
go.mod
@@ -1,19 +1,23 @@
|
||||
module ZhenTuLocalPassiveAdapter
|
||||
|
||||
go 1.23.0
|
||||
go 1.24.0
|
||||
|
||||
toolchain go1.23.6
|
||||
toolchain go1.24.11
|
||||
|
||||
require (
|
||||
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
|
||||
github.com/aws/aws-sdk-go-v2 v1.36.3
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.62
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2
|
||||
github.com/gin-gonic/gin v1.11.0
|
||||
github.com/spf13/viper v1.20.0
|
||||
go.opentelemetry.io/otel v1.35.0
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0
|
||||
go.opentelemetry.io/otel/sdk v1.35.0
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/sync v0.16.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -26,33 +30,58 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
|
||||
github.com/aws/smithy-go v1.22.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.0 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/gin-contrib/sse v1.1.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.27.0 // indirect
|
||||
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/goccy/go-yaml v1.18.0 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
|
||||
github.com/quic-go/qpack v0.5.1 // indirect
|
||||
github.com/quic-go/quic-go v0.54.0 // indirect
|
||||
github.com/sagikazarmark/locafero v0.8.0 // indirect
|
||||
github.com/sourcegraph/conc v0.3.0 // indirect
|
||||
github.com/spf13/afero v1.14.0 // indirect
|
||||
github.com/spf13/cast v1.7.1 // indirect
|
||||
github.com/spf13/pflag v1.0.6 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.3.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
|
||||
go.uber.org/mock v0.5.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/net v0.35.0 // indirect
|
||||
golang.org/x/sys v0.31.0 // indirect
|
||||
golang.org/x/text v0.23.0 // indirect
|
||||
golang.org/x/arch v0.20.0 // indirect
|
||||
golang.org/x/crypto v0.40.0 // indirect
|
||||
golang.org/x/mod v0.25.0 // indirect
|
||||
golang.org/x/net v0.42.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/text v0.27.0 // indirect
|
||||
golang.org/x/time v0.14.0 // indirect
|
||||
golang.org/x/tools v0.34.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
|
||||
google.golang.org/grpc v1.71.0 // indirect
|
||||
google.golang.org/protobuf v1.36.5 // indirect
|
||||
google.golang.org/protobuf v1.36.9 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
105
go.sum
105
go.sum
@@ -1,3 +1,5 @@
|
||||
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
|
||||
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
|
||||
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
|
||||
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
|
||||
@@ -22,35 +24,79 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2 h1:jIiopHEV22b4yQP2q36Y0OmwLbsxN
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2/go.mod h1:U5SNqwhXB3Xe6F47kXvWihPl/ilGaEDe8HD/50Z9wxc=
|
||||
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
|
||||
github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
|
||||
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
|
||||
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
|
||||
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
|
||||
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
|
||||
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
|
||||
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
|
||||
github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w=
|
||||
github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM=
|
||||
github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk=
|
||||
github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
|
||||
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
|
||||
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
|
||||
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
|
||||
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
|
||||
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
|
||||
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
|
||||
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
|
||||
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
|
||||
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
|
||||
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
|
||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
|
||||
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
|
||||
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
|
||||
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
|
||||
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/sagikazarmark/locafero v0.8.0 h1:mXaMVw7IqxNBxfv3LdWt9MDmcWDQ1fagDH918lOdVaQ=
|
||||
@@ -65,10 +111,21 @@ github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
|
||||
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/viper v1.20.0 h1:zrxIyR3RQIOsarIrgL8+sAvALXul9jeEPa06Y0Ph6vY=
|
||||
github.com/spf13/viper v1.20.0/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA=
|
||||
github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||
@@ -77,10 +134,6 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0f
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0 h1:PB3Zrjs1sG1GBX51SXyTSoOTqcDglmsk7nT6tkKPb/k=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0/go.mod h1:U2R3XyVPzn0WX7wOIypPuptulsMcPDPs/oiSVOMVnHY=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0 h1:T0Ec2E+3YZf5bgTNQVet8iTDW7oIk03tXHq+wkwIDnE=
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0/go.mod h1:30v2gqH+vYGJsesLWFov8u47EpYTcIQcBjKpI6pJThg=
|
||||
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
|
||||
@@ -93,24 +146,44 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU
|
||||
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
|
||||
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
|
||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
|
||||
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
|
||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
||||
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
||||
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
|
||||
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
|
||||
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
|
||||
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
|
||||
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
|
||||
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
|
||||
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
|
||||
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
|
||||
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
|
||||
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
|
||||
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
|
||||
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
|
||||
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
|
||||
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
|
||||
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
|
||||
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
|
||||
google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg=
|
||||
google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
|
||||
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
|
||||
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
|
||||
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
74
logger/logger.go
Normal file
74
logger/logger.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
)
|
||||
|
||||
var Logger *zap.Logger
|
||||
|
||||
func Init() error {
|
||||
config := zap.NewProductionConfig()
|
||||
config.OutputPaths = []string{"stdout"}
|
||||
config.ErrorOutputPaths = []string{"stderr"}
|
||||
|
||||
// 配置日志轮换
|
||||
lumberJackLogger := &lumberjack.Logger{
|
||||
Filename: "logs/app.log",
|
||||
MaxSize: 10, // MB
|
||||
MaxBackups: 3,
|
||||
MaxAge: 30, // days
|
||||
Compress: true,
|
||||
}
|
||||
|
||||
// 创建写入器
|
||||
w := zapcore.AddSync(lumberJackLogger)
|
||||
|
||||
// 设置日志级别
|
||||
level := zap.NewAtomicLevelAt(zap.InfoLevel)
|
||||
|
||||
// 创建编码器配置
|
||||
encoderConfig := zap.NewProductionEncoderConfig()
|
||||
encoderConfig.TimeKey = "time"
|
||||
encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05")
|
||||
encoderConfig.EncodeCaller = zapcore.ShortCallerEncoder
|
||||
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
|
||||
encoderConfig.MessageKey = "msg"
|
||||
encoderConfig.CallerKey = "caller"
|
||||
|
||||
// 创建控制台编码器(文本格式)
|
||||
encoder := zapcore.NewConsoleEncoder(encoderConfig)
|
||||
|
||||
// 创建核心
|
||||
core := zapcore.NewCore(encoder, w, level)
|
||||
|
||||
// 创建日志记录器
|
||||
Logger = zap.New(core, zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Info(message string, fields ...zap.Field) {
|
||||
Logger.Info(message, fields...)
|
||||
}
|
||||
|
||||
func Error(message string, fields ...zap.Field) {
|
||||
Logger.Error(message, fields...)
|
||||
}
|
||||
|
||||
func Warn(message string, fields ...zap.Field) {
|
||||
Logger.Warn(message, fields...)
|
||||
}
|
||||
|
||||
func Debug(message string, fields ...zap.Field) {
|
||||
Logger.Debug(message, fields...)
|
||||
}
|
||||
|
||||
func Fatal(message string, fields ...zap.Field) {
|
||||
Logger.Fatal(message, fields...)
|
||||
}
|
||||
|
||||
func Sync() error {
|
||||
return Logger.Sync()
|
||||
}
|
||||
160
main.go
160
main.go
@@ -5,14 +5,20 @@ import (
|
||||
"ZhenTuLocalPassiveAdapter/config"
|
||||
"ZhenTuLocalPassiveAdapter/core"
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"ZhenTuLocalPassiveAdapter/telemetry"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var tracer = otel.Tracer("vpt")
|
||||
@@ -28,84 +34,146 @@ func startTask(device config.DeviceMapping, task dto.Task) {
|
||||
fo, err := core.HandleTask(ctx, device, task)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "处理任务失败")
|
||||
log.Printf("处理任务失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
||||
logger.Error("处理任务失败",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("deviceNo", task.DeviceNo),
|
||||
zap.Error(err))
|
||||
api.ReportTaskFailure(ctx, task.TaskID)
|
||||
return
|
||||
}
|
||||
span.SetAttributes(attribute.String("fileUrl", fo.URL))
|
||||
log.Printf("处理任务成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
|
||||
logger.Info("处理任务成功",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("deviceNo", task.DeviceNo))
|
||||
err = api.UploadTaskFile(ctx, task, *fo)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "上传文件失败")
|
||||
log.Printf("上传文件失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
||||
logger.Error("上传文件失败",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("deviceNo", task.DeviceNo),
|
||||
zap.Error(err))
|
||||
api.ReportTaskFailure(ctx, task.TaskID)
|
||||
return
|
||||
}
|
||||
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
|
||||
if !result {
|
||||
span.SetStatus(codes.Error, "上报任务成功失败")
|
||||
log.Printf("上报任务成功失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
||||
logger.Error("上报任务成功失败",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("deviceNo", task.DeviceNo),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
span.SetStatus(codes.Ok, "上传文件成功")
|
||||
log.Printf("上传文件成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
|
||||
logger.Info("上传文件成功",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("deviceNo", task.DeviceNo))
|
||||
}
|
||||
|
||||
func runTaskLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// 执行任务
|
||||
tasks, err := api.SyncTask()
|
||||
if err == nil {
|
||||
for _, task := range tasks {
|
||||
logger.Info("开始处理任务",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("deviceNo", task.DeviceNo),
|
||||
zap.String("startTime", task.StartTime.Format("2006-01-02 15:04:05")),
|
||||
zap.String("endTime", task.EndTime.Format("2006-01-02 15:04:05")))
|
||||
// 处理任务
|
||||
for _, device := range config.Config.Devices {
|
||||
if device.DeviceNo == task.DeviceNo {
|
||||
// 处理任务
|
||||
go startTask(device, task)
|
||||
break // 提前返回,避免不必要的循环
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.Error("同步任务失败", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startViidServer() {
|
||||
if !config.Config.Viid.Enabled {
|
||||
return
|
||||
}
|
||||
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
r := gin.Default()
|
||||
|
||||
// Register Routes
|
||||
api.RegisterVIIDRoutes(r)
|
||||
|
||||
addr := fmt.Sprintf(":%d", config.Config.Viid.Port)
|
||||
logger.Info("VIID Server starting", zap.String("addr", addr))
|
||||
go func() {
|
||||
if err := r.Run(addr); err != nil {
|
||||
logger.Error("VIID Server failed", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func main() {
|
||||
err := config.LoadConfig()
|
||||
// 初始化日志
|
||||
err := logger.Init()
|
||||
if err != nil {
|
||||
log.Println("加载配置文件失败:", err)
|
||||
panic(err)
|
||||
}
|
||||
defer logger.Sync()
|
||||
|
||||
err = config.LoadConfig()
|
||||
if err != nil {
|
||||
logger.Fatal("加载配置文件失败", zap.Error(err))
|
||||
return
|
||||
}
|
||||
// 日志文件路径
|
||||
logFilePath := "app.log"
|
||||
ctx := context.Background()
|
||||
shutdown, err := telemetry.InitTelemetry(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to initialize telemetry: %v", err)
|
||||
logger.Fatal("Failed to initialize telemetry", zap.Error(err))
|
||||
return
|
||||
}
|
||||
defer shutdown(ctx)
|
||||
// 创建或打开日志文件
|
||||
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open log file: %v", err)
|
||||
}
|
||||
defer logFile.Close()
|
||||
|
||||
// 设置日志输出到文件
|
||||
log.SetOutput(logFile)
|
||||
if config.Config.Record.Storage.Type == "local" {
|
||||
_, err = os.Stat(config.Config.Record.Storage.Path)
|
||||
if err != nil {
|
||||
log.Println("录像文件夹配置失败", err)
|
||||
logger.Error("录像文件夹配置失败", zap.Error(err))
|
||||
return
|
||||
} else {
|
||||
log.Println("录像文件夹配置有效")
|
||||
logger.Info("录像文件夹配置有效")
|
||||
}
|
||||
} else {
|
||||
log.Println("录像文件夹配置为OSS")
|
||||
}
|
||||
// 每两秒定时执行
|
||||
for {
|
||||
// 执行任务
|
||||
tasks, err := api.SyncTask()
|
||||
if err == nil {
|
||||
for _, task := range tasks {
|
||||
log.Printf("开始处理任务, TaskID:【%s】,DeviceNo: %s,开始时间: %s,结束时间: %s\n", task.TaskID, task.DeviceNo, task.StartTime, task.EndTime)
|
||||
// 处理任务
|
||||
for _, device := range config.Config.Devices {
|
||||
if device.DeviceNo == task.DeviceNo {
|
||||
// 处理任务
|
||||
go startTask(device, task)
|
||||
break // 提前返回,避免不必要的循环
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Println("同步任务失败:", err)
|
||||
}
|
||||
// 等待两秒
|
||||
<-time.After(2 * time.Second)
|
||||
logger.Info("录像文件夹配置为OSS")
|
||||
}
|
||||
|
||||
// Start VIID Server
|
||||
startViidServer()
|
||||
|
||||
// Context for graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Handle Signals
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Start Task Loop
|
||||
go runTaskLoop(ctx)
|
||||
|
||||
// Wait for signal
|
||||
<-sigChan
|
||||
logger.Info("Received shutdown signal")
|
||||
cancel()
|
||||
logger.Info("Shutdown complete")
|
||||
}
|
||||
|
||||
104
model/custom_time.go
Normal file
104
model/custom_time.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CustomTime 自定义时间类型,用于JSON序列化为YYYY-MM-DD HH:mm:ss格式
|
||||
type CustomTime struct {
|
||||
time.Time
|
||||
}
|
||||
|
||||
// MarshalJSON 自定义JSON序列化
|
||||
func (ct CustomTime) MarshalJSON() ([]byte, error) {
|
||||
if ct.Time.IsZero() {
|
||||
return json.Marshal("")
|
||||
}
|
||||
return json.Marshal(ct.Time.Format("2006-01-02 15:04:05"))
|
||||
}
|
||||
|
||||
// UnmarshalJSON 自定义JSON反序列化
|
||||
// 支持格式:
|
||||
// 1. 数字时间戳:10位秒级(1732435200)或13位毫秒级(1732435200000)
|
||||
// 2. 字符串格式:秒级(2024-11-24 12:00:00)或毫秒级(2024-11-24 12:00:00.000)
|
||||
// 3. 空字符串:转换为零值时间
|
||||
func (ct *CustomTime) UnmarshalJSON(data []byte) error {
|
||||
// 1. 尝试解析为数字时间戳
|
||||
var timestamp int64
|
||||
if err := json.Unmarshal(data, ×tamp); err == nil {
|
||||
if timestamp == 0 {
|
||||
ct.Time = time.Time{}
|
||||
return nil
|
||||
}
|
||||
// 根据位数判断精度:13位为毫秒级,10位为秒级
|
||||
if timestamp > 9999999999 {
|
||||
ct.Time = time.UnixMilli(timestamp)
|
||||
} else {
|
||||
ct.Time = time.Unix(timestamp, 0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 2. 解析字符串格式
|
||||
var str string
|
||||
if err := json.Unmarshal(data, &str); err != nil {
|
||||
return err
|
||||
}
|
||||
if str == "" {
|
||||
ct.Time = time.Time{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 3. 使用本地时区解析时间,避免时区问题
|
||||
// 尝试多种格式(按精度从高到低)
|
||||
formats := []string{
|
||||
"2006-01-02 15:04:05.000", // 毫秒格式
|
||||
"2006-01-02 15:04:05", // 秒格式(现有)
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for _, format := range formats {
|
||||
t, err := time.ParseInLocation(format, str, time.Local)
|
||||
if err == nil {
|
||||
ct.Time = t
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// Value 实现driver.Valuer接口,用于写入数据库
|
||||
func (ct CustomTime) Value() (driver.Value, error) {
|
||||
if ct.Time.IsZero() {
|
||||
return nil, nil
|
||||
}
|
||||
return ct.Time, nil
|
||||
}
|
||||
|
||||
// Scan 实现sql.Scanner接口,用于从数据库读取
|
||||
func (ct *CustomTime) Scan(value interface{}) error {
|
||||
if value == nil {
|
||||
ct.Time = time.Time{}
|
||||
return nil
|
||||
}
|
||||
if t, ok := value.(time.Time); ok {
|
||||
ct.Time = t
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("无法扫描 %T 到 CustomTime", value)
|
||||
}
|
||||
|
||||
// NewCustomTime 创建一个新的 CustomTime 实例
|
||||
func NewCustomTime(t time.Time) CustomTime {
|
||||
return CustomTime{Time: t}
|
||||
}
|
||||
|
||||
// NowCustomTime 返回当前时间的 CustomTime 实例
|
||||
func NowCustomTime() CustomTime {
|
||||
return CustomTime{Time: time.Now()}
|
||||
}
|
||||
@@ -2,12 +2,13 @@ package util
|
||||
|
||||
import (
|
||||
"ZhenTuLocalPassiveAdapter/dto"
|
||||
"ZhenTuLocalPassiveAdapter/logger"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
@@ -36,7 +37,7 @@ func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
}
|
||||
// 先尝试方法1
|
||||
if !result {
|
||||
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
|
||||
logger.Warn("FFMPEG简易方法失败,尝试复杂方法转码")
|
||||
// 不行再尝试方法二
|
||||
result = runFfmpegForMultipleFile2(subCtx, task)
|
||||
}
|
||||
@@ -80,7 +81,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
tmpFile := path.Join(os.TempDir(), file.Name+strconv.Itoa(rand.Int())+".ts")
|
||||
result, err := convertMp4ToTs(subCtx, *file, tmpFile)
|
||||
if err != nil {
|
||||
log.Printf("转码出错: %v", err)
|
||||
logger.Error("转码出错", zap.Error(err))
|
||||
mu.Lock()
|
||||
notOk = true
|
||||
mu.Unlock()
|
||||
@@ -115,7 +116,7 @@ func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
// 步骤三:删除临时文件
|
||||
for _, file := range taskClone.Files {
|
||||
if err := os.Remove(file.Url); err != nil {
|
||||
log.Printf("删除临时文件失败: %v", err)
|
||||
logger.Error("删除临时文件失败", zap.Error(err))
|
||||
}
|
||||
}
|
||||
if result {
|
||||
@@ -185,7 +186,7 @@ func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||
stat, err := os.Stat(task.OutputFile)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "文件不存在")
|
||||
log.Printf("文件不存在:%s", task.OutputFile)
|
||||
logger.Error("文件不存在", zap.String("outputFile", task.OutputFile))
|
||||
return false
|
||||
}
|
||||
span.SetAttributes(attribute.String("file.name", task.OutputFile))
|
||||
@@ -219,7 +220,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
|
||||
defer span.End()
|
||||
if fileList == nil || len(fileList) == 0 {
|
||||
span.SetStatus(codes.Error, "无法根据要求找到对应录制片段")
|
||||
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt)
|
||||
logger.Error("无法根据要求找到对应录制片段",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("beginTime", beginDt.Format("2006-01-02 15:04:05")),
|
||||
zap.String("endTime", endDt.Format("2006-01-02 15:04:05")))
|
||||
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
|
||||
}
|
||||
// 按照 Create 的值升序排序
|
||||
@@ -238,7 +242,11 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
|
||||
if file.StartTime.Sub(lastFile.EndTime).Seconds() > 2 {
|
||||
// 片段断开
|
||||
span.SetStatus(codes.Error, "FFMPEG片段断开")
|
||||
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
|
||||
logger.Error("分析FFMPEG任务失败,文件片段中间断开超过2秒",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("lastFile", lastFile.Name),
|
||||
zap.String("currentFile", file.Name),
|
||||
zap.Float64("gapSeconds", file.StartTime.Sub(lastFile.EndTime).Seconds()))
|
||||
return nil, fmt.Errorf("片段断开")
|
||||
}
|
||||
lastFile = &file
|
||||
@@ -248,7 +256,10 @@ func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File,
|
||||
// 通过文件列表构造的任务仍然是缺失的
|
||||
if fileList[len(fileList)-1].EndTime.Before(endDt) {
|
||||
span.SetStatus(codes.Error, "FFMPEG片段断开")
|
||||
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt)
|
||||
logger.Error("分析FFMPEG任务失败,无法完整覆盖时间点",
|
||||
zap.String("taskID", task.TaskID),
|
||||
zap.String("lastFile", fileList[len(fileList)-1].Name),
|
||||
zap.String("endTime", endDt.Format("2006-01-02 15:04:05")))
|
||||
return nil, fmt.Errorf("片段断开")
|
||||
}
|
||||
|
||||
@@ -326,7 +337,7 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "创建临时文件失败")
|
||||
log.Printf("创建临时文件失败:%s", tmpFile)
|
||||
logger.Error("创建临时文件失败", zap.String("tmpFile", tmpFile))
|
||||
return false, err
|
||||
}
|
||||
defer os.Remove(tmpFile)
|
||||
@@ -337,7 +348,7 @@ func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, len
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "写入临时文件失败")
|
||||
log.Printf("写入临时文件失败:%s", tmpFile)
|
||||
logger.Error("写入临时文件失败", zap.String("tmpFile", tmpFile))
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
@@ -401,7 +412,7 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
|
||||
defer func() {
|
||||
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
|
||||
}()
|
||||
log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " "))
|
||||
logger.Info("FFMPEG执行命令", zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
|
||||
|
||||
var stderr bytes.Buffer
|
||||
@@ -411,7 +422,9 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
||||
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
||||
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
||||
logger.Error("FFMPEG执行命令失败",
|
||||
zap.String("error", stderr.String()),
|
||||
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||
return false, err
|
||||
}
|
||||
defer cmd.Process.Kill()
|
||||
@@ -425,17 +438,21 @@ func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error)
|
||||
case <-time.After(1 * time.Minute):
|
||||
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
||||
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
|
||||
log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " "))
|
||||
logger.Warn("FFMPEG执行命令超时", zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||
return false, fmt.Errorf("ffmpeg command timed out")
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
||||
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
||||
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
||||
logger.Error("FFMPEG执行命令失败",
|
||||
zap.String("error", stderr.String()),
|
||||
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||
return false, err
|
||||
}
|
||||
endTime := time.Now()
|
||||
log.Printf("FFMPEG执行命令结束,耗费时间:【%dms】,命令:【%s】", endTime.Sub(startTime).Milliseconds(), strings.Join(ffmpegCmd, " "))
|
||||
logger.Info("FFMPEG执行命令结束",
|
||||
zap.Int64("durationMs", endTime.Sub(startTime).Milliseconds()),
|
||||
zap.String("command", strings.Join(ffmpegCmd, " ")))
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
@@ -461,12 +478,11 @@ func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
|
||||
span.SetStatus(codes.Error, "failed to get video duration")
|
||||
return 0, fmt.Errorf("failed to get video duration: %w", err)
|
||||
}
|
||||
span.SetAttributes(attribute.String("ffmpeg.stdout", out.String()))
|
||||
span.SetAttributes(attribute.String("ffprobe.stdout", out.String()))
|
||||
durationStr := strings.TrimSpace(out.String())
|
||||
duration, err := strconv.ParseFloat(durationStr, 64)
|
||||
if err != nil {
|
||||
span.SetAttributes(attribute.String("error", err.Error()))
|
||||
span.SetStatus(codes.Error, "failed to parse video duration")
|
||||
return 0, fmt.Errorf("failed to parse video duration: %w", err)
|
||||
}
|
||||
span.SetAttributes(attribute.Float64("video.duration", duration))
|
||||
|
||||
96
util/image.go
Normal file
96
util/image.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"image"
|
||||
"image/draw"
|
||||
"image/jpeg"
|
||||
_ "image/png" // Register PNG decoder just in case
|
||||
)
|
||||
|
||||
// GenerateThumbnail creates a thumbnail by cropping the original image.
|
||||
// The thumbnail size is 1/2 of the original dimensions.
|
||||
// The crop is centered on the provided coordinates (centerX, centerY), constrained to image bounds.
|
||||
func GenerateThumbnail(srcData []byte, centerX, centerY int) ([]byte, error) {
|
||||
// 1. Decode image
|
||||
img, _, err := image.Decode(bytes.NewReader(srcData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode image failed: %v", err)
|
||||
}
|
||||
|
||||
bounds := img.Bounds()
|
||||
origW := bounds.Dx()
|
||||
origH := bounds.Dy()
|
||||
|
||||
// 2. Calculate Target Dimensions (1/2 of original)
|
||||
targetW := origW / 2
|
||||
targetH := origH / 2
|
||||
|
||||
if targetW == 0 || targetH == 0 {
|
||||
return nil, fmt.Errorf("image too small to generate half-size thumbnail")
|
||||
}
|
||||
|
||||
// 3. Calculate Crop Origin (Top-Left)
|
||||
// cropX = centerX - targetW / 2
|
||||
cropX := centerX - targetW/2
|
||||
cropY := centerY - targetH/2
|
||||
|
||||
// 4. Constrain to Bounds
|
||||
if cropX < 0 {
|
||||
cropX = 0
|
||||
}
|
||||
if cropY < 0 {
|
||||
cropY = 0
|
||||
}
|
||||
if cropX+targetW > origW {
|
||||
cropX = origW - targetW
|
||||
}
|
||||
if cropY+targetH > origH {
|
||||
cropY = origH - targetH
|
||||
}
|
||||
|
||||
// Double check after adjustment
|
||||
if cropX < 0 {
|
||||
cropX = 0
|
||||
}
|
||||
if cropY < 0 {
|
||||
cropY = 0
|
||||
}
|
||||
|
||||
// 5. Perform Crop
|
||||
cropRect := image.Rect(cropX, cropY, cropX+targetW, cropY+targetH)
|
||||
|
||||
var dst image.Image
|
||||
|
||||
// Try to use SubImage if supported for performance
|
||||
type SubImager interface {
|
||||
SubImage(r image.Rectangle) image.Image
|
||||
}
|
||||
|
||||
if sub, ok := img.(SubImager); ok {
|
||||
dst = sub.SubImage(cropRect)
|
||||
} else {
|
||||
// Fallback: create new image and draw
|
||||
rgba := image.NewRGBA(image.Rect(0, 0, targetW, targetH))
|
||||
draw.Draw(rgba, rgba.Bounds(), img, cropRect.Min, draw.Src)
|
||||
dst = rgba
|
||||
}
|
||||
|
||||
// 6. Encode to JPEG
|
||||
var buf bytes.Buffer
|
||||
err = jpeg.Encode(&buf, dst, &jpeg.Options{Quality: 85})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encode thumbnail failed: %v", err)
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
// Helper to process using LeftTop and RightBottom coordinates
|
||||
func GenerateThumbnailFromCoords(srcData []byte, ltX, ltY, rbX, rbY int) ([]byte, error) {
|
||||
// Calculate center based on the provided coordinates
|
||||
centerX := (ltX + rbX) / 2
|
||||
centerY := (ltY + rbY) / 2
|
||||
return GenerateThumbnail(srcData, centerX, centerY)
|
||||
}
|
||||
Reference in New Issue
Block a user