From 5dfe6d6356277b8d758315567ccbb1881d7000cf Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Mon, 21 Apr 2025 15:08:02 +0800 Subject: [PATCH] =?UTF-8?q?s3=20=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E6=B7=BB=E5=8A=A0=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E8=87=AA=E6=B8=85=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fs/s3_adapter.go | 61 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/fs/s3_adapter.go b/fs/s3_adapter.go index 9788ba1..67a4e83 100644 --- a/fs/s3_adapter.go +++ b/fs/s3_adapter.go @@ -19,10 +19,11 @@ import ( "go.opentelemetry.io/otel/codes" ) +var s3Cache sync.Map + type S3Adapter struct { StorageConfig config.StorageConfig s3Client *s3.Client - cache sync.Map } func (s *S3Adapter) getClient() (*s3.Client, error) { @@ -53,39 +54,43 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time. span.SetAttributes(attribute.String("path", dirPath)) span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02"))) + if s.StorageConfig.S3.Bucket == "" { + span.SetAttributes(attribute.String("error", "未配置S3存储桶")) + span.SetStatus(codes.Error, "未配置S3存储桶") + return nil, fmt.Errorf("未配置S3存储桶") + } + cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02")) - if cachedInterface, ok := s.cache.Load(cacheKey); ok { + if cachedInterface, ok := s3Cache.Load(cacheKey); ok { cachedItem := cachedInterface.(cacheItem) + log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now())) if time.Now().Before(cachedItem.expires) { + log.Println("获取已缓存列表", cacheKey) span.SetAttributes(attribute.Bool("cache.hit", true)) return cachedItem.data, nil } } mutexKey := fmt.Sprintf("lock_%s", cacheKey) - mutex, _ := s.cache.LoadOrStore(mutexKey, &sync.Mutex{}) + mutex, _ := s3Cache.LoadOrStore(mutexKey, &sync.Mutex{}) lock := mutex.(*sync.Mutex) defer func() { // 解锁后删除锁(避免内存泄漏) - s.cache.Delete(mutexKey) + s3Cache.Delete(mutexKey) lock.Unlock() }() lock.Lock() - if cachedInterface, ok := s.cache.Load(cacheKey); ok { + if cachedInterface, ok := s3Cache.Load(cacheKey); ok { cachedItem := cachedInterface.(cacheItem) + log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now())) if time.Now().Before(cachedItem.expires) { - span.SetAttributes(attribute.Bool("cache.hit", true)) + log.Println("过锁后获取已缓存列表", cacheKey) + span.SetAttributes(attribute.Bool("s3Cache.hit", true)) return cachedItem.data, nil } } - if s.StorageConfig.S3.Bucket == "" { - span.SetAttributes(attribute.String("error", "未配置S3存储桶")) - span.SetStatus(codes.Error, "未配置S3存储桶") - return nil, fmt.Errorf("未配置S3存储桶") - } - listObjectsInput := &s3.ListObjectsV2Input{ Bucket: aws.String(s.StorageConfig.S3.Bucket), Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)), @@ -168,7 +173,8 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time. data: fileList, expires: time.Now().Add(10 * time.Second), } - s.cache.Store(cacheKey, cacheItem) + s3Cache.Store(cacheKey, cacheItem) + log.Println("缓存文件列表", cacheKey) return fileList, nil } @@ -177,3 +183,32 @@ type cacheItem struct { data []dto.File expires time.Time } + +// 添加定时清理缓存的初始化函数 +func init() { + go func() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + cleanupCache() + } + } + }() +} + +// 添加缓存清理函数 +func cleanupCache() { + var keysToDelete []interface{} + s3Cache.Range(func(key, value interface{}) bool { + item := value.(cacheItem) + if time.Now().After(item.expires) { + keysToDelete = append(keysToDelete, key) + } + return true + }) + for _, key := range keysToDelete { + s3Cache.Delete(key) + } +}