diff --git a/.gitignore b/.gitignore index 7198016..c706074 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ dist/ -.idea/ \ No newline at end of file +.idea/ +.exe diff --git a/config.yaml b/config.yaml index de441bf..c41fb2f 100644 --- a/config.yaml +++ b/config.yaml @@ -12,6 +12,9 @@ record: akId: Idi2MBaWH2F0LFIWGdDY akSec: Idi2MBaWH2F0LFIWGdDY duration: 30 + cache: + fileListTTLSeconds: 30 + fileListMaxEntries: 256 devices: - deviceNo: "44020000001322500001" name: "ppda-010268-zymyj" diff --git a/config/dto.go b/config/dto.go index 7ed966e..90bc436 100644 --- a/config/dto.go +++ b/config/dto.go @@ -4,9 +4,15 @@ type ApiConfig struct { BaseUrl string `mapstructure:"baseUrl"` } +type CacheConfig struct { + FileListTTLSeconds int `mapstructure:"fileListTTLSeconds"` + FileListMaxEntries int `mapstructure:"fileListMaxEntries"` +} + type RecordConfig struct { Storage StorageConfig `mapstructure:"storage"` Duration int `mapstructure:"duration"` + Cache CacheConfig `mapstructure:"cache"` } type StorageConfig struct { diff --git a/fs/ali_adapter.go b/fs/ali_adapter.go index 255c418..666cf3a 100644 --- a/fs/ali_adapter.go +++ b/fs/ali_adapter.go @@ -9,7 +9,6 @@ import ( "fmt" "path" "sort" - "sync" "time" "github.com/aliyun/aliyun-oss-go-sdk/oss" @@ -18,8 +17,6 @@ import ( "go.uber.org/zap" ) -var aliOssCache sync.Map - type AliOSSAdapter struct { StorageConfig config.StorageConfig ossClient *oss.Client @@ -53,151 +50,96 @@ func (a *AliOSSAdapter) GetFileList(ctx context.Context, dirPath string, relDt t } cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02")) - if cachedInterface, ok := aliOssCache.Load(cacheKey); ok { - cachedItem := cachedInterface.(cacheItem) - logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now()))) - if time.Now().Before(cachedItem.expires) { - logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey)) - span.SetAttributes(attribute.Bool("cache.hit", true)) - return cachedItem.data, nil - } + fileListCache := getAliOssFileListCache() + if cachedFiles, ok := fileListCache.Get(cacheKey); ok { + logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey)) + span.SetAttributes(attribute.Bool("cache.hit", true)) + span.SetAttributes(attribute.Int("file.count", len(cachedFiles))) + span.SetStatus(codes.Ok, "文件读取成功") + return cachedFiles, nil } - mutexKey := fmt.Sprintf("lock_%s", cacheKey) - mutex, _ := aliOssCache.LoadOrStore(mutexKey, &sync.Mutex{}) - lock := mutex.(*sync.Mutex) - defer func() { - // 解锁后删除锁(避免内存泄漏) - aliOssCache.Delete(mutexKey) - lock.Unlock() - }() - lock.Lock() - - if cachedInterface, ok := aliOssCache.Load(cacheKey); ok { - cachedItem := cachedInterface.(cacheItem) - logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now()))) - if time.Now().Before(cachedItem.expires) { - logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey)) - span.SetAttributes(attribute.Bool("aliOssCache.hit", true)) - return cachedItem.data, nil + fileList, hit, shared, err := fileListCache.GetOrLoad(cacheKey, func() ([]dto.File, error) { + client, err := a.getClient() + if err != nil { + return nil, err } - } - client, err := a.getClient() + bucket, err := client.Bucket(a.StorageConfig.AliOSS.Bucket) + if err != nil { + return nil, fmt.Errorf("获取存储桶失败: %w", err) + } + + var resultFiles []dto.File + prefix := path.Join(a.StorageConfig.AliOSS.Prefix, dirPath) + marker := "" + + for { + lsRes, err := bucket.ListObjects( + oss.Prefix(prefix), + oss.Marker(marker), + oss.MaxKeys(1000), + ) + if err != nil { + return nil, fmt.Errorf("文件列表读取失败: %w", err) + } + + for _, object := range lsRes.Objects { + key := object.Key + if !util.IsVideoFile(path.Base(key)) { + continue + } + startTime, stopTime, err := util.ParseStartStopTime(path.Base(key), relDt) + if err != nil { + continue + } + if stopTime.IsZero() { + stopTime = startTime + } + if startTime.Equal(stopTime) { + stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) + } + + // 生成预签名URL(有效期10分钟) + signedURL, err := bucket.SignURL(key, oss.HTTPGet, 600) + if err != nil { + logger.Error("生成预签名URL失败", zap.Error(err)) + continue + } + + resultFiles = append(resultFiles, dto.File{ + BasePath: a.StorageConfig.AliOSS.Bucket, + Name: path.Base(key), + Path: path.Dir(key), + Url: signedURL, + StartTime: startTime, + EndTime: stopTime, + }) + } + + if !lsRes.IsTruncated { + break + } + marker = lsRes.NextMarker + } + + sort.Slice(resultFiles, func(i, j int) bool { + return resultFiles[i].StartTime.Before(resultFiles[j].StartTime) + }) + return resultFiles, nil + }) if err != nil { span.SetAttributes(attribute.String("error", err.Error())) - span.SetStatus(codes.Error, "创建阿里云OSS客户端失败") + span.SetStatus(codes.Error, "文件列表读取失败") return nil, err } - bucket, err := client.Bucket(a.StorageConfig.AliOSS.Bucket) - if err != nil { - span.SetAttributes(attribute.String("error", err.Error())) - span.SetStatus(codes.Error, "获取存储桶失败") - return nil, fmt.Errorf("获取存储桶失败: %w", err) - } - - var fileList []dto.File - prefix := path.Join(a.StorageConfig.AliOSS.Prefix, dirPath) - marker := "" - - for { - lsRes, err := bucket.ListObjects( - oss.Prefix(prefix), - oss.Marker(marker), - oss.MaxKeys(1000), - ) - if err != nil { - span.SetAttributes(attribute.String("error", err.Error())) - span.SetStatus(codes.Error, "文件列表读取失败") - return nil, fmt.Errorf("文件列表读取失败: %w", err) - } - - for _, object := range lsRes.Objects { - key := object.Key - if !util.IsVideoFile(path.Base(key)) { - continue - } - startTime, stopTime, err := util.ParseStartStopTime(path.Base(key), relDt) - if err != nil { - continue - } - if stopTime.IsZero() { - stopTime = startTime - } - if startTime.Equal(stopTime) { - stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) - } - - // 生成预签名URL(有效期10分钟) - signedURL, err := bucket.SignURL(key, oss.HTTPGet, 600) - if err != nil { - span.SetAttributes(attribute.String("error", err.Error())) - span.SetStatus(codes.Error, "生成预签名URL失败") - logger.Error("生成预签名URL失败", zap.Error(err)) - continue - } - - fileList = append(fileList, dto.File{ - BasePath: a.StorageConfig.AliOSS.Bucket, - Name: path.Base(key), - Path: path.Dir(key), - Url: signedURL, - StartTime: startTime, - EndTime: stopTime, - }) - } - - if !lsRes.IsTruncated { - break - } - marker = lsRes.NextMarker - } - + span.SetAttributes(attribute.Bool("cache.shared", shared)) span.SetAttributes(attribute.Int("file.count", len(fileList))) - sort.Slice(fileList, func(i, j int) bool { - return fileList[i].StartTime.Before(fileList[j].StartTime) - }) span.SetStatus(codes.Ok, "文件读取成功") - cacheItem := cacheItem{ - data: fileList, - expires: time.Now().Add(30 * time.Second), + if !hit && !shared { + logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey)) } - aliOssCache.Store(cacheKey, cacheItem) - logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey)) - return fileList, nil } - -// 添加定时清理缓存的初始化函数 -func init() { - go func() { - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - for { - select { - case <-ticker.C: - cleanupAliOssCache() - } - } - }() -} - -// 添加缓存清理函数 -func cleanupAliOssCache() { - var keysToDelete []interface{} - aliOssCache.Range(func(key, value interface{}) bool { - item, ok := value.(cacheItem) - if !ok { - return true - } - if time.Now().After(item.expires) { - keysToDelete = append(keysToDelete, key) - } - return true - }) - for _, key := range keysToDelete { - aliOssCache.Delete(key) - } -} diff --git a/fs/file_list_cache.go b/fs/file_list_cache.go new file mode 100644 index 0000000..99ef1e9 --- /dev/null +++ b/fs/file_list_cache.go @@ -0,0 +1,182 @@ +package fs + +import ( + "ZhenTuLocalPassiveAdapter/config" + "ZhenTuLocalPassiveAdapter/dto" + "container/list" + "fmt" + "sync" + "time" + + "github.com/patrickmn/go-cache" + "golang.org/x/sync/singleflight" +) + +const ( + defaultFileListCacheTTLSeconds = 30 + defaultFileListCacheMaxEntries = 256 + fileListCacheCleanupInterval = 1 * time.Minute +) + +var ( + s3FileListCacheOnce sync.Once + s3FileListCacheInstance *fileListCache + + aliOssFileListCacheOnce sync.Once + aliOssFileListCacheInstance *fileListCache +) + +func getS3FileListCache() *fileListCache { + s3FileListCacheOnce.Do(func() { + s3FileListCacheInstance = newFileListCache(getFileListCacheTTL(), getFileListCacheMaxEntries()) + }) + return s3FileListCacheInstance +} + +func getAliOssFileListCache() *fileListCache { + aliOssFileListCacheOnce.Do(func() { + aliOssFileListCacheInstance = newFileListCache(getFileListCacheTTL(), getFileListCacheMaxEntries()) + }) + return aliOssFileListCacheInstance +} + +func getFileListCacheTTL() time.Duration { + ttlSeconds := config.Config.Record.Cache.FileListTTLSeconds + if ttlSeconds <= 0 { + ttlSeconds = defaultFileListCacheTTLSeconds + } + return time.Duration(ttlSeconds) * time.Second +} + +func getFileListCacheMaxEntries() int { + maxEntries := config.Config.Record.Cache.FileListMaxEntries + if maxEntries <= 0 { + maxEntries = defaultFileListCacheMaxEntries + } + return maxEntries +} + +type fileListCache struct { + cache *cache.Cache + maxEntries int + + mu sync.Mutex + lru *list.List + elements map[string]*list.Element + + group singleflight.Group +} + +func newFileListCache(ttl time.Duration, maxEntries int) *fileListCache { + if ttl <= 0 { + ttl = defaultFileListCacheTTLSeconds * time.Second + } + if maxEntries <= 0 { + maxEntries = defaultFileListCacheMaxEntries + } + + c := cache.New(ttl, fileListCacheCleanupInterval) + result := &fileListCache{ + cache: c, + maxEntries: maxEntries, + lru: list.New(), + elements: make(map[string]*list.Element, maxEntries), + } + c.OnEvicted(func(key string, value any) { + result.removeFromLru(key) + }) + return result +} + +func (c *fileListCache) GetOrLoad(key string, loader func() ([]dto.File, error)) ([]dto.File, bool, bool, error) { + if files, ok := c.Get(key); ok { + return files, true, false, nil + } + + value, err, shared := c.group.Do(key, func() (any, error) { + if files, ok := c.Get(key); ok { + return files, nil + } + files, err := loader() + if err != nil { + return nil, err + } + c.Set(key, files) + return files, nil + }) + if err != nil { + return nil, false, shared, err + } + files, ok := value.([]dto.File) + if !ok { + return nil, false, shared, fmt.Errorf("缓存返回类型错误: key=%s", key) + } + return files, false, shared, nil +} + +func (c *fileListCache) Get(key string) ([]dto.File, bool) { + value, ok := c.cache.Get(key) + if !ok { + c.removeFromLru(key) + return nil, false + } + files, ok := value.([]dto.File) + if !ok { + c.cache.Delete(key) + c.removeFromLru(key) + return nil, false + } + c.touch(key) + return files, true +} + +func (c *fileListCache) Set(key string, files []dto.File) { + c.cache.Set(key, files, cache.DefaultExpiration) + c.touch(key) + c.evictIfNeeded() +} + +func (c *fileListCache) touch(key string) { + c.mu.Lock() + defer c.mu.Unlock() + + if el, ok := c.elements[key]; ok { + c.lru.MoveToFront(el) + return + } + c.elements[key] = c.lru.PushFront(key) +} + +func (c *fileListCache) removeFromLru(key string) { + c.mu.Lock() + defer c.mu.Unlock() + + el, ok := c.elements[key] + if !ok { + return + } + c.lru.Remove(el) + delete(c.elements, key) +} + +func (c *fileListCache) evictIfNeeded() { + for { + c.mu.Lock() + if len(c.elements) <= c.maxEntries { + c.mu.Unlock() + return + } + el := c.lru.Back() + if el == nil { + c.mu.Unlock() + return + } + key := el.Value.(string) + c.lru.Remove(el) + delete(c.elements, key) + c.mu.Unlock() + + // 注意:不要在持有 c.mu 时调用 c.cache.Delete,否则会和 OnEvicted 回调产生锁顺序死锁风险 + c.cache.Delete(key) + } +} diff --git a/fs/s3_adapter.go b/fs/s3_adapter.go index a40af2c..dcec818 100644 --- a/fs/s3_adapter.go +++ b/fs/s3_adapter.go @@ -11,7 +11,6 @@ import ( "go.uber.org/zap" "path" "sort" - "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -20,8 +19,6 @@ import ( "go.opentelemetry.io/otel/codes" ) -var s3Cache sync.Map - type S3Adapter struct { StorageConfig config.StorageConfig s3Client *s3.Client @@ -62,158 +59,99 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time. } cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02")) - if cachedInterface, ok := s3Cache.Load(cacheKey); ok { - cachedItem := cachedInterface.(cacheItem) - logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now()))) - if time.Now().Before(cachedItem.expires) { - logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey)) - span.SetAttributes(attribute.Bool("cache.hit", true)) - return cachedItem.data, nil - } + fileListCache := getS3FileListCache() + if cachedFiles, ok := fileListCache.Get(cacheKey); ok { + logger.Debug("获取已缓存列表", zap.String("cacheKey", cacheKey)) + span.SetAttributes(attribute.Bool("cache.hit", true)) + span.SetAttributes(attribute.Int("file.count", len(cachedFiles))) + span.SetStatus(codes.Ok, "文件读取成功") + return cachedFiles, nil } - mutexKey := fmt.Sprintf("lock_%s", cacheKey) - mutex, _ := s3Cache.LoadOrStore(mutexKey, &sync.Mutex{}) - lock := mutex.(*sync.Mutex) - defer func() { - // 解锁后删除锁(避免内存泄漏) - s3Cache.Delete(mutexKey) - lock.Unlock() - }() - lock.Lock() - - if cachedInterface, ok := s3Cache.Load(cacheKey); ok { - cachedItem := cachedInterface.(cacheItem) - logger.Debug("缓存过期时间", zap.Duration("expiresIn", cachedItem.expires.Sub(time.Now()))) - if time.Now().Before(cachedItem.expires) { - logger.Debug("过锁后获取已缓存列表", zap.String("cacheKey", cacheKey)) - span.SetAttributes(attribute.Bool("s3Cache.hit", true)) - return cachedItem.data, nil - } - } - - listObjectsInput := &s3.ListObjectsV2Input{ - Bucket: aws.String(s.StorageConfig.S3.Bucket), - Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)), - MaxKeys: aws.Int32(1000), - } - - client, err := s.getClient() - if err != nil { - span.SetAttributes(attribute.String("error", err.Error())) - span.SetStatus(codes.Error, "创建S3客户端失败") - return nil, err - } - - var fileList []dto.File - var continuationToken *string - - for { - if continuationToken != nil { - listObjectsInput.ContinuationToken = continuationToken + fileList, hit, shared, err := fileListCache.GetOrLoad(cacheKey, func() ([]dto.File, error) { + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(s.StorageConfig.S3.Bucket), + Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)), + MaxKeys: aws.Int32(1000), } - result, err := client.ListObjectsV2(context.TODO(), listObjectsInput) + client, err := s.getClient() if err != nil { - span.SetAttributes(attribute.String("error", err.Error())) - span.SetStatus(codes.Error, "文件列表读取失败") return nil, err } - for _, object := range result.Contents { - key := *object.Key - if !util.IsVideoFile(path.Base(key)) { - continue + var resultFiles []dto.File + var continuationToken *string + + for { + if continuationToken != nil { + listObjectsInput.ContinuationToken = continuationToken } - startTime, stopTime, err := util.ParseStartStopTime(path.Base(key), relDt) + + result, err := client.ListObjectsV2(context.TODO(), listObjectsInput) if err != nil { - continue + return nil, err } - if stopTime.IsZero() { - stopTime = startTime + + for _, object := range result.Contents { + key := *object.Key + if !util.IsVideoFile(path.Base(key)) { + continue + } + startTime, stopTime, err := util.ParseStartStopTime(path.Base(key), relDt) + if err != nil { + continue + } + if stopTime.IsZero() { + stopTime = startTime + } + if startTime.Equal(stopTime) { + stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) + } + presignClient := s3.NewPresignClient(client) + request, err := presignClient.PresignGetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: aws.String(s.StorageConfig.S3.Bucket), + Key: aws.String(key), + }, func(presignOptions *s3.PresignOptions) { + presignOptions.Expires = 10 * time.Minute + }) + if err != nil { + logger.Error("生成预签名URL失败", zap.Error(err)) + continue + } + resultFiles = append(resultFiles, dto.File{ + BasePath: s.StorageConfig.S3.Bucket, + Name: path.Base(key), + Path: path.Dir(key), + Url: request.URL, + StartTime: startTime, + EndTime: stopTime, + }) } - if startTime.Equal(stopTime) { - stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) + + if !*result.IsTruncated { + break } - presignClient := s3.NewPresignClient(client) - request, err := presignClient.PresignGetObject(context.TODO(), &s3.GetObjectInput{ - Bucket: aws.String(s.StorageConfig.S3.Bucket), - Key: aws.String(key), - }, func(presignOptions *s3.PresignOptions) { - presignOptions.Expires = 10 * time.Minute - }) - if err != nil { - span.SetAttributes(attribute.String("error", err.Error())) - span.SetStatus(codes.Error, "生成预签名URL失败") - logger.Error("生成预签名URL失败", zap.Error(err)) - continue - } - fileList = append(fileList, dto.File{ - BasePath: s.StorageConfig.S3.Bucket, - Name: path.Base(key), - Path: path.Dir(key), - Url: request.URL, - StartTime: startTime, - EndTime: stopTime, - }) + continuationToken = result.NextContinuationToken } - if !*result.IsTruncated { - break - } - continuationToken = result.NextContinuationToken + sort.Slice(resultFiles, func(i, j int) bool { + return resultFiles[i].StartTime.Before(resultFiles[j].StartTime) + }) + return resultFiles, nil + }) + if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件列表读取失败") + return nil, err } + span.SetAttributes(attribute.Bool("cache.shared", shared)) span.SetAttributes(attribute.Int("file.count", len(fileList))) - sort.Slice(fileList, func(i, j int) bool { - return fileList[i].StartTime.Before(fileList[j].StartTime) - }) span.SetStatus(codes.Ok, "文件读取成功") - cacheItem := cacheItem{ - data: fileList, - expires: time.Now().Add(30 * time.Second), + if !hit && !shared { + logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey)) } - s3Cache.Store(cacheKey, cacheItem) - logger.Debug("缓存文件列表", zap.String("cacheKey", cacheKey)) - return fileList, nil } - -type cacheItem struct { - data []dto.File - expires time.Time -} - -// 添加定时清理缓存的初始化函数 -func init() { - go func() { - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - for { - select { - case <-ticker.C: - cleanupCache() - } - } - }() -} - -// 添加缓存清理函数 -func cleanupCache() { - var keysToDelete []interface{} - s3Cache.Range(func(key, value interface{}) bool { - // 类型检查:跳过非 cacheItem 类型的值(例如 lock_xxx 对应的 *sync.Mutex) - item, ok := value.(cacheItem) - if !ok { - return true - } - if time.Now().After(item.expires) { - keysToDelete = append(keysToDelete, key) - } - return true - }) - for _, key := range keysToDelete { - s3Cache.Delete(key) - } -} diff --git a/go.mod b/go.mod index 4c0e676..2abc6d6 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.62 github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2 github.com/gin-gonic/gin v1.11.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/spf13/viper v1.20.0 go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 diff --git a/go.sum b/go.sum index 0e34c12..ac2d260 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=