diff --git a/config/dto.go b/config/dto.go index 3918d95..7ed966e 100644 --- a/config/dto.go +++ b/config/dto.go @@ -10,9 +10,10 @@ type RecordConfig struct { } type StorageConfig struct { - Type string `mapstructure:"type"` - Path string `mapstructure:"path"` - S3 S3Config `mapstructure:"s3"` + Type string `mapstructure:"type"` + Path string `mapstructure:"path"` + S3 S3Config `mapstructure:"s3"` + AliOSS AliOSSConfig `mapstructure:"aliOss"` } type S3Config struct { @@ -24,6 +25,14 @@ type S3Config struct { AkSec string `mapstructure:"akSec"` } +type AliOSSConfig struct { + Endpoint string `mapstructure:"endpoint"` + Bucket string `mapstructure:"bucket"` + Prefix string `mapstructure:"prefix"` + AccessKeyId string `mapstructure:"accessKeyId"` + AccessKeySecret string `mapstructure:"accessKeySecret"` +} + type DeviceMapping struct { DeviceNo string `mapstructure:"deviceNo" json:"deviceNo"` Name string `mapstructure:"name" json:"name"` diff --git a/fs/adapter.go b/fs/adapter.go index bde2abb..4095048 100644 --- a/fs/adapter.go +++ b/fs/adapter.go @@ -16,6 +16,10 @@ func GetAdapter() Adapter { return &S3Adapter{ StorageConfig: config.Config.Record.Storage, } + } else if config.Config.Record.Storage.Type == "alioss" { + return &AliOSSAdapter{ + StorageConfig: config.Config.Record.Storage, + } } else { return &LocalAdapter{ config.Config.Record.Storage, diff --git a/fs/ali_adapter.go b/fs/ali_adapter.go new file mode 100644 index 0000000..255c418 --- /dev/null +++ b/fs/ali_adapter.go @@ -0,0 +1,203 @@ +package fs + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "ZhenTuLocalPassiveAdapter/logger" + "ZhenTuLocalPassiveAdapter/util" + "context" + "fmt" + "path" + "sort" + "sync" + "time" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.uber.org/zap" +) + +var aliOssCache sync.Map + +type AliOSSAdapter struct { + StorageConfig config.StorageConfig + ossClient *oss.Client +} + +func (a *AliOSSAdapter) getClient() (*oss.Client, error) { + if a.ossClient == nil { + client, err := oss.New( + a.StorageConfig.AliOSS.Endpoint, + a.StorageConfig.AliOSS.AccessKeyId, + a.StorageConfig.AliOSS.AccessKeySecret, + ) + if err != nil { + return nil, fmt.Errorf("创建阿里云OSS客户端失败: %w", err) + } + a.ossClient = client + } + return a.ossClient, nil +} + +func (a *AliOSSAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) { + _, span := tracer.Start(ctx, "GetFileList_alioss") + defer span.End() + + span.SetAttributes(attribute.String("path", dirPath)) + span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02"))) + if a.StorageConfig.AliOSS.Bucket == "" { + span.SetAttributes(attribute.String("error", "未配置阿里云OSS存储桶")) + span.SetStatus(codes.Error, "未配置阿里云OSS存储桶") + return nil, fmt.Errorf("未配置阿里云OSS存储桶") + } + + cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02")) + if cachedInterface, ok := aliOssCache.Load(cacheKey); ok { + cachedItem := cachedInterface.(cacheItem) + logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now()))) + if time.Now().Before(cachedItem.expires) { + logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey)) + span.SetAttributes(attribute.Bool("cache.hit", true)) + return cachedItem.data, nil + } + } + + mutexKey := fmt.Sprintf("lock_%s", cacheKey) + mutex, _ := aliOssCache.LoadOrStore(mutexKey, &sync.Mutex{}) + lock := mutex.(*sync.Mutex) + defer func() { + // 解锁后删除锁(避免内存泄漏) + aliOssCache.Delete(mutexKey) + lock.Unlock() + }() + lock.Lock() + + if cachedInterface, ok := aliOssCache.Load(cacheKey); ok { + cachedItem := cachedInterface.(cacheItem) + logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now()))) + if time.Now().Before(cachedItem.expires) { + logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey)) + span.SetAttributes(attribute.Bool("aliOssCache.hit", true)) + return cachedItem.data, nil + } + } + + client, err := a.getClient() + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建阿里云OSS客户端失败") + return nil, err + } + + bucket, err := client.Bucket(a.StorageConfig.AliOSS.Bucket) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "获取存储桶失败") + return nil, fmt.Errorf("获取存储桶失败: %w", err) + } + + var fileList []dto.File + prefix := path.Join(a.StorageConfig.AliOSS.Prefix, dirPath) + marker := "" + + for { + lsRes, err := bucket.ListObjects( + oss.Prefix(prefix), + oss.Marker(marker), + oss.MaxKeys(1000), + ) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件列表读取失败") + return nil, fmt.Errorf("文件列表读取失败: %w", err) + } + + for _, object := range lsRes.Objects { + key := object.Key + if !util.IsVideoFile(path.Base(key)) { + continue + } + startTime, stopTime, err := util.ParseStartStopTime(path.Base(key), relDt) + if err != nil { + continue + } + if stopTime.IsZero() { + stopTime = startTime + } + if startTime.Equal(stopTime) { + stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) + } + + // 生成预签名URL(有效期10分钟) + signedURL, err := bucket.SignURL(key, oss.HTTPGet, 600) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "生成预签名URL失败") + logger.Error("生成预签名URL失败", zap.Error(err)) + continue + } + + fileList = append(fileList, dto.File{ + BasePath: a.StorageConfig.AliOSS.Bucket, + Name: path.Base(key), + Path: path.Dir(key), + Url: signedURL, + StartTime: startTime, + EndTime: stopTime, + }) + } + + if !lsRes.IsTruncated { + break + } + marker = lsRes.NextMarker + } + + span.SetAttributes(attribute.Int("file.count", len(fileList))) + sort.Slice(fileList, func(i, j int) bool { + return fileList[i].StartTime.Before(fileList[j].StartTime) + }) + span.SetStatus(codes.Ok, "文件读取成功") + + cacheItem := cacheItem{ + data: fileList, + expires: time.Now().Add(30 * time.Second), + } + aliOssCache.Store(cacheKey, cacheItem) + logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey)) + + return fileList, nil +} + +// 添加定时清理缓存的初始化函数 +func init() { + go func() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + cleanupAliOssCache() + } + } + }() +} + +// 添加缓存清理函数 +func cleanupAliOssCache() { + var keysToDelete []interface{} + aliOssCache.Range(func(key, value interface{}) bool { + item, ok := value.(cacheItem) + if !ok { + return true + } + if time.Now().After(item.expires) { + keysToDelete = append(keysToDelete, key) + } + return true + }) + for _, key := range keysToDelete { + aliOssCache.Delete(key) + } +} diff --git a/go.mod b/go.mod index ab6d79c..4c0e676 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,15 @@ module ZhenTuLocalPassiveAdapter -go 1.23.0 +go 1.24.0 -toolchain go1.23.6 +toolchain go1.24.11 require ( + github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aws/aws-sdk-go-v2 v1.36.3 github.com/aws/aws-sdk-go-v2/credentials v1.17.62 github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2 + github.com/gin-gonic/gin v1.11.0 github.com/spf13/viper v1.20.0 go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 @@ -35,7 +37,6 @@ require ( github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gin-contrib/sse v1.1.0 // indirect - github.com/gin-gonic/gin v1.11.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect @@ -76,6 +77,7 @@ require ( golang.org/x/net v0.42.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.27.0 // indirect + golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.34.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect diff --git a/go.sum b/go.sum index db408f0..0e34c12 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= +github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs= @@ -48,6 +50,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -85,8 +89,6 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= -github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -116,8 +118,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= @@ -156,23 +158,17 @@ golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= -golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= -golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0= @@ -181,8 +177,6 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=