Compare commits

..

7 Commits

Author SHA1 Message Date
5dfe6d6356 s3 优化缓存逻辑,添加缓存自清理逻辑 2025-04-21 15:08:02 +08:00
509b829c5b s3 修复缓存键避免被逻辑修改 2025-04-21 14:44:23 +08:00
e6f93a4d37 s3 避免缓存击穿 2025-04-21 14:37:51 +08:00
2971c5f52d s3 添加缓存避免延迟爆炸 2025-04-21 14:02:56 +08:00
3d7c88de5f s3 一次性10000个 2025-04-13 18:43:45 +08:00
f9256895b7 去除错误 2025-04-13 18:32:20 +08:00
104930c413 优化stopTime为空时逻辑爆炸的问题 2025-04-13 16:07:29 +08:00
3 changed files with 93 additions and 6 deletions

View File

@ -26,6 +26,9 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
span.SetStatus(codes.Error, "未配置存储路径") span.SetStatus(codes.Error, "未配置存储路径")
return nil, fmt.Errorf("未配置存储路径") return nil, fmt.Errorf("未配置存储路径")
} }
span.SetAttributes(attribute.String("path", dirPath))
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
// 读取文件夹下目录 // 读取文件夹下目录
files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath)) files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath))
if err != nil { if err != nil {
@ -51,7 +54,10 @@ func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt ti
if err != nil { if err != nil {
continue continue
} }
if startTime.Equal(stopTime) || stopTime.IsZero() { if stopTime.IsZero() {
stopTime = startTime
}
if startTime.Equal(stopTime) {
// 如果文件名没有时间戳,则认为该文件是未录制完成的 // 如果文件名没有时间戳,则认为该文件是未录制完成的
// 尝试读取一下视频信息 // 尝试读取一下视频信息
duration, err := util.GetVideoDuration(subCtx, path.Join(l.StorageConfig.Path, dirPath, file.Name())) duration, err := util.GetVideoDuration(subCtx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))

View File

@ -10,6 +10,7 @@ import (
"log" "log"
"path" "path"
"sort" "sort"
"sync"
"time" "time"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
@ -18,6 +19,8 @@ import (
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
) )
var s3Cache sync.Map
type S3Adapter struct { type S3Adapter struct {
StorageConfig config.StorageConfig StorageConfig config.StorageConfig
s3Client *s3.Client s3Client *s3.Client
@ -49,15 +52,49 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
_, span := tracer.Start(ctx, "GetFileList_s3") _, span := tracer.Start(ctx, "GetFileList_s3")
defer span.End() defer span.End()
span.SetAttributes(attribute.String("path", dirPath))
span.SetAttributes(attribute.String("relativeDate", relDt.Format("2006-01-02")))
if s.StorageConfig.S3.Bucket == "" { if s.StorageConfig.S3.Bucket == "" {
span.SetAttributes(attribute.String("error", "未配置S3存储桶")) span.SetAttributes(attribute.String("error", "未配置S3存储桶"))
span.SetStatus(codes.Error, "未配置S3存储桶") span.SetStatus(codes.Error, "未配置S3存储桶")
return nil, fmt.Errorf("未配置S3存储桶") return nil, fmt.Errorf("未配置S3存储桶")
} }
cacheKey := fmt.Sprintf("%s_%s", dirPath, relDt.Format("2006-01-02"))
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now()))
if time.Now().Before(cachedItem.expires) {
log.Println("获取已缓存列表", cacheKey)
span.SetAttributes(attribute.Bool("cache.hit", true))
return cachedItem.data, nil
}
}
mutexKey := fmt.Sprintf("lock_%s", cacheKey)
mutex, _ := s3Cache.LoadOrStore(mutexKey, &sync.Mutex{})
lock := mutex.(*sync.Mutex)
defer func() {
// 解锁后删除锁(避免内存泄漏)
s3Cache.Delete(mutexKey)
lock.Unlock()
}()
lock.Lock()
if cachedInterface, ok := s3Cache.Load(cacheKey); ok {
cachedItem := cachedInterface.(cacheItem)
log.Println("缓存过期时间", cachedItem.expires.Sub(time.Now()))
if time.Now().Before(cachedItem.expires) {
log.Println("过锁后获取已缓存列表", cacheKey)
span.SetAttributes(attribute.Bool("s3Cache.hit", true))
return cachedItem.data, nil
}
}
listObjectsInput := &s3.ListObjectsV2Input{ listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(s.StorageConfig.S3.Bucket), Bucket: aws.String(s.StorageConfig.S3.Bucket),
Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)), Prefix: aws.String(path.Join(s.StorageConfig.S3.Prefix, dirPath)),
MaxKeys: aws.Int32(1000),
} }
client, err := s.getClient() client, err := s.getClient()
@ -91,7 +128,10 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
if err != nil { if err != nil {
continue continue
} }
if startTime.Equal(stopTime) || stopTime.IsZero() { if stopTime.IsZero() {
stopTime = startTime
}
if startTime.Equal(stopTime) {
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
} }
presignClient := s3.NewPresignClient(client) presignClient := s3.NewPresignClient(client)
@ -128,5 +168,47 @@ func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.
return fileList[i].StartTime.Before(fileList[j].StartTime) return fileList[i].StartTime.Before(fileList[j].StartTime)
}) })
span.SetStatus(codes.Ok, "文件读取成功") span.SetStatus(codes.Ok, "文件读取成功")
cacheItem := cacheItem{
data: fileList,
expires: time.Now().Add(10 * time.Second),
}
s3Cache.Store(cacheKey, cacheItem)
log.Println("缓存文件列表", cacheKey)
return fileList, nil return fileList, nil
} }
type cacheItem struct {
data []dto.File
expires time.Time
}
// 添加定时清理缓存的初始化函数
func init() {
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cleanupCache()
}
}
}()
}
// 添加缓存清理函数
func cleanupCache() {
var keysToDelete []interface{}
s3Cache.Range(func(key, value interface{}) bool {
item := value.(cacheItem)
if time.Now().After(item.expires) {
keysToDelete = append(keysToDelete, key)
}
return true
})
for _, key := range keysToDelete {
s3Cache.Delete(key)
}
}

View File

@ -461,12 +461,11 @@ func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
span.SetStatus(codes.Error, "failed to get video duration") span.SetStatus(codes.Error, "failed to get video duration")
return 0, fmt.Errorf("failed to get video duration: %w", err) return 0, fmt.Errorf("failed to get video duration: %w", err)
} }
span.SetAttributes(attribute.String("ffmpeg.stdout", out.String())) span.SetAttributes(attribute.String("ffprobe.stdout", out.String()))
durationStr := strings.TrimSpace(out.String()) durationStr := strings.TrimSpace(out.String())
duration, err := strconv.ParseFloat(durationStr, 64) duration, err := strconv.ParseFloat(durationStr, 64)
if err != nil { if err != nil {
span.SetAttributes(attribute.String("error", err.Error())) span.SetAttributes(attribute.String("error", err.Error()))
span.SetStatus(codes.Error, "failed to parse video duration")
return 0, fmt.Errorf("failed to parse video duration: %w", err) return 0, fmt.Errorf("failed to parse video duration: %w", err)
} }
span.SetAttributes(attribute.Float64("video.duration", duration)) span.SetAttributes(attribute.Float64("video.duration", duration))