You've already forked FrameTour-BE
refactor(integration): 将降级缓存从Redis迁移到Caffeine内存缓存
- 移除RedisTemplate依赖,改用Caffeine作为缓存实现 - 添加缓存互斥锁机制,避免并发请求打崩下游服务 - 统一缓存策略:有缓存直接返回,无缓存调用远程并缓存结果 - 调整缓存TTL配置,从天单位改为分钟单位 - 更新缓存统计信息结构,TTL字段从天改为分钟 - 优化批量清除缓存逻辑,使用流式过滤处理 - 简化缓存操作API,移除无返回值的执行方法
This commit is contained in:
@@ -1,371 +1,153 @@
|
||||
package com.ycwl.basic.integration.common.service;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.ycwl.basic.integration.common.config.IntegrationProperties;
|
||||
import com.ycwl.basic.utils.JacksonUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 集成服务通用失败降级处理
|
||||
* 提供统一的降级策略,支持所有微服务集成
|
||||
* 使用 Caffeine 内存缓存,缓存命中时直接返回避免打崩下游服务
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class IntegrationFallbackService {
|
||||
|
||||
private final RedisTemplate<String, String> redisTemplate;
|
||||
|
||||
private final IntegrationProperties integrationProperties;
|
||||
|
||||
// 默认降级缓存配置
|
||||
private final Cache<String, String> fallbackCache;
|
||||
|
||||
private static final String DEFAULT_FALLBACK_PREFIX = "integration:fallback:";
|
||||
private static final long DEFAULT_FALLBACK_TTL_DAYS = 1; // 7天
|
||||
private static final long FALLBACK_CACHE_PREFERRED_MAX_AGE_SECONDS = 60; // 1分钟
|
||||
|
||||
private static final long CACHE_TTL_MINUTES = 1;
|
||||
private static final long MAX_CACHE_SIZE = 50000;
|
||||
|
||||
public IntegrationFallbackService(IntegrationProperties integrationProperties) {
|
||||
this.integrationProperties = integrationProperties;
|
||||
this.fallbackCache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(CACHE_TTL_MINUTES, TimeUnit.MINUTES)
|
||||
.maximumSize(MAX_CACHE_SIZE)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行操作,失败时降级到缓存结果
|
||||
*
|
||||
* @param serviceName 服务名称 (如: zt-device, zt-scenic)
|
||||
* @param cacheKey 缓存键
|
||||
* @param operation 主要操作
|
||||
* @param resultClass 结果类型
|
||||
* @param <T> 结果类型
|
||||
* @return 操作结果或缓存的结果
|
||||
* 执行操作,优先返回缓存结果
|
||||
* 策略:有缓存直接返回,无缓存调用远程并缓存结果
|
||||
* 同一 cacheKey 有互斥锁,避免并发请求打崩下游服务
|
||||
*/
|
||||
public <T> T executeWithFallback(String serviceName, String cacheKey, Supplier<T> operation, Class<T> resultClass) {
|
||||
String fullKey = buildFullCacheKey(serviceName, cacheKey);
|
||||
String cachedValue = null;
|
||||
boolean isPreferredCache = false;
|
||||
try {
|
||||
cachedValue = redisTemplate.opsForValue().get(fullKey);
|
||||
isPreferredCache = cachedValue != null && isFallbackCachePreferred(serviceName, fullKey);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 读取降级缓存失败,将继续尝试远端获取, cacheKey: {}", serviceName, cacheKey, e);
|
||||
}
|
||||
|
||||
if (isPreferredCache) {
|
||||
T preferredCacheResult = parseFallbackCacheValue(serviceName, cacheKey, cachedValue, resultClass);
|
||||
if (preferredCacheResult != null) {
|
||||
log.debug("[{}] 命中优先缓存(<=1分钟),直接返回, cacheKey: {}", serviceName, cacheKey);
|
||||
return preferredCacheResult;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// Caffeine.get() 内置互斥锁:同一 key 只有一个线程执行 loader,其他线程等待
|
||||
String cachedValue = fallbackCache.get(fullKey, k -> {
|
||||
log.debug("[{}] 缓存未命中,调用远程, cacheKey: {}", serviceName, cacheKey);
|
||||
T result = operation.get();
|
||||
if (result != null) {
|
||||
// 操作成功,保存结果用于将来的降级
|
||||
storeFallbackCache(serviceName, cacheKey, result);
|
||||
}
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 操作失败,尝试降级到缓存结果, cacheKey: {}", serviceName, cacheKey, e);
|
||||
T fallbackResult = parseFallbackCacheValue(serviceName, cacheKey, cachedValue, resultClass);
|
||||
if (fallbackResult == null) {
|
||||
fallbackResult = getFallbackFromCache(serviceName, cacheKey, resultClass);
|
||||
}
|
||||
if (fallbackResult == null) {
|
||||
log.error("[{}] 操作失败且无缓存数据, cacheKey: {}", serviceName, cacheKey);
|
||||
throw e;
|
||||
}
|
||||
log.info("[{}] 成功从降级缓存获取结果, cacheKey: {}", serviceName, cacheKey);
|
||||
return fallbackResult;
|
||||
return result != null ? JacksonUtil.toJSONString(result) : null;
|
||||
});
|
||||
|
||||
if (cachedValue == null) {
|
||||
return null;
|
||||
}
|
||||
return parseValue(serviceName, cacheKey, cachedValue, resultClass);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行操作,失败时降级到缓存结果(支持TypeReference泛型)
|
||||
*
|
||||
* @param serviceName 服务名称 (如: zt-device, zt-scenic)
|
||||
* @param cacheKey 缓存键
|
||||
* @param operation 主要操作
|
||||
* @param typeReference 类型引用,用于保留泛型信息
|
||||
* @param <T> 结果类型
|
||||
* @return 操作结果或缓存的结果
|
||||
* 执行操作,优先返回缓存结果(支持TypeReference泛型)
|
||||
* 同一 cacheKey 有互斥锁,避免并发请求打崩下游服务
|
||||
*/
|
||||
public <T> T executeWithFallback(String serviceName, String cacheKey, Supplier<T> operation, TypeReference<T> typeReference) {
|
||||
String fullKey = buildFullCacheKey(serviceName, cacheKey);
|
||||
String cachedValue = null;
|
||||
boolean isPreferredCache = false;
|
||||
try {
|
||||
cachedValue = redisTemplate.opsForValue().get(fullKey);
|
||||
isPreferredCache = cachedValue != null && isFallbackCachePreferred(serviceName, fullKey);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 读取降级缓存失败,将继续尝试远端获取, cacheKey: {}", serviceName, cacheKey, e);
|
||||
}
|
||||
|
||||
if (isPreferredCache) {
|
||||
T preferredCacheResult = parseFallbackCacheValue(serviceName, cacheKey, cachedValue, typeReference);
|
||||
if (preferredCacheResult != null) {
|
||||
log.debug("[{}] 命中优先缓存(<=1分钟),直接返回, cacheKey: {}", serviceName, cacheKey);
|
||||
return preferredCacheResult;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// Caffeine.get() 内置互斥锁:同一 key 只有一个线程执行 loader,其他线程等待
|
||||
String cachedValue = fallbackCache.get(fullKey, k -> {
|
||||
log.debug("[{}] 缓存未命中,调用远程, cacheKey: {}", serviceName, cacheKey);
|
||||
T result = operation.get();
|
||||
if (result != null) {
|
||||
// 操作成功,保存结果用于将来的降级
|
||||
storeFallbackCache(serviceName, cacheKey, result);
|
||||
}
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 操作失败,尝试降级到缓存结果, cacheKey: {}", serviceName, cacheKey, e);
|
||||
T fallbackResult = parseFallbackCacheValue(serviceName, cacheKey, cachedValue, typeReference);
|
||||
if (fallbackResult == null) {
|
||||
fallbackResult = getFallbackFromCache(serviceName, cacheKey, typeReference);
|
||||
}
|
||||
if (fallbackResult == null) {
|
||||
log.error("[{}] 操作失败且无缓存数据, cacheKey: {}", serviceName, cacheKey);
|
||||
throw e;
|
||||
}
|
||||
log.info("[{}] 成功从降级缓存获取结果, cacheKey: {}", serviceName, cacheKey);
|
||||
return fallbackResult;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行操作,失败时降级到缓存结果,无返回值版本
|
||||
*
|
||||
* @param serviceName 服务名称
|
||||
* @param cacheKey 缓存键
|
||||
* @param operation 主要操作
|
||||
*/
|
||||
public void executeWithFallback(String serviceName, String cacheKey, Runnable operation) {
|
||||
try {
|
||||
operation.run();
|
||||
// 操作成功,记录成功状态
|
||||
storeFallbackCache(serviceName, cacheKey + ":success", "true");
|
||||
log.debug("[{}] 操作成功,已记录成功状态, cacheKey: {}", serviceName, cacheKey);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 操作失败,检查是否有历史成功记录, cacheKey: {}", serviceName, cacheKey, e);
|
||||
String successRecord = getFallbackFromCache(serviceName, cacheKey + ":success", String.class);
|
||||
if (successRecord == null) {
|
||||
log.error("[{}] 操作失败且无历史成功记录, cacheKey: {}", serviceName, cacheKey);
|
||||
throw e;
|
||||
}
|
||||
log.info("[{}] 操作失败但有历史成功记录,忽略此次失败, cacheKey: {}", serviceName, cacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储降级缓存
|
||||
*/
|
||||
private void storeFallbackCache(String serviceName, String cacheKey, Object value) {
|
||||
try {
|
||||
String fullKey = buildFullCacheKey(serviceName, cacheKey);
|
||||
String jsonValue = JacksonUtil.toJSONString(value);
|
||||
long ttlDays = getFallbackTtl(serviceName);
|
||||
redisTemplate.opsForValue().set(fullKey, jsonValue, ttlDays, TimeUnit.DAYS);
|
||||
log.debug("[{}] 存储降级缓存成功, key: {}", serviceName, fullKey);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 存储降级缓存失败, cacheKey: {}", serviceName, cacheKey, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从降级缓存获取结果
|
||||
*/
|
||||
private <T> T getFallbackFromCache(String serviceName, String cacheKey, Class<T> resultClass) {
|
||||
try {
|
||||
String fullKey = buildFullCacheKey(serviceName, cacheKey);
|
||||
String cachedValue = redisTemplate.opsForValue().get(fullKey);
|
||||
if (cachedValue != null) {
|
||||
log.debug("[{}] 从降级缓存获取结果, key: {}", serviceName, fullKey);
|
||||
if (resultClass == String.class) {
|
||||
return resultClass.cast(cachedValue);
|
||||
}
|
||||
return JacksonUtil.parseObject(cachedValue, resultClass);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 从降级缓存获取结果失败, cacheKey: {}", serviceName, cacheKey, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return result != null ? JacksonUtil.toJSONString(result) : null;
|
||||
});
|
||||
|
||||
/**
|
||||
* 从降级缓存获取结果(支持TypeReference泛型)
|
||||
*/
|
||||
private <T> T getFallbackFromCache(String serviceName, String cacheKey, TypeReference<T> typeReference) {
|
||||
try {
|
||||
String fullKey = buildFullCacheKey(serviceName, cacheKey);
|
||||
String cachedValue = redisTemplate.opsForValue().get(fullKey);
|
||||
if (cachedValue != null) {
|
||||
log.debug("[{}] 从降级缓存获取结果, key: {}", serviceName, fullKey);
|
||||
return JacksonUtil.parseObject(cachedValue, typeReference);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 从降级缓存获取结果失败, cacheKey: {}", serviceName, cacheKey, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断当前降级缓存是否应该优先使用。
|
||||
* 规则:缓存写入后的 1 分钟内优先使用,超过 1 分钟则优先尝试远端;远端失败再降级到缓存。
|
||||
*/
|
||||
private boolean isFallbackCachePreferred(String serviceName, String fullKey) {
|
||||
try {
|
||||
Long remainingSeconds = redisTemplate.getExpire(fullKey, TimeUnit.SECONDS);
|
||||
if (remainingSeconds == null || remainingSeconds < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
long ttlDays = getFallbackTtl(serviceName);
|
||||
long expectedTtlSeconds = TimeUnit.DAYS.toSeconds(ttlDays);
|
||||
if (remainingSeconds > expectedTtlSeconds) {
|
||||
return false;
|
||||
}
|
||||
|
||||
long ageSeconds = expectedTtlSeconds - remainingSeconds;
|
||||
return ageSeconds <= FALLBACK_CACHE_PREFERRED_MAX_AGE_SECONDS;
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 判断降级缓存有效期失败,视为不优先, key: {}", serviceName, fullKey, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private <T> T parseFallbackCacheValue(String serviceName, String cacheKey, String cachedValue, Class<T> resultClass) {
|
||||
if (cachedValue == null) {
|
||||
return null;
|
||||
}
|
||||
return parseValue(serviceName, cacheKey, cachedValue, typeReference);
|
||||
}
|
||||
|
||||
private <T> T parseValue(String serviceName, String cacheKey, String value, Class<T> resultClass) {
|
||||
try {
|
||||
if (resultClass == String.class) {
|
||||
return resultClass.cast(cachedValue);
|
||||
}
|
||||
return JacksonUtil.parseObject(cachedValue, resultClass);
|
||||
return JacksonUtil.parseObject(value, resultClass);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 解析降级缓存失败, cacheKey: {}", serviceName, cacheKey, e);
|
||||
log.warn("[{}] 解析缓存失败, cacheKey: {}", serviceName, cacheKey, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析降级缓存值(支持TypeReference泛型)
|
||||
*/
|
||||
private <T> T parseFallbackCacheValue(String serviceName, String cacheKey, String cachedValue, TypeReference<T> typeReference) {
|
||||
if (cachedValue == null) {
|
||||
return null;
|
||||
}
|
||||
private <T> T parseValue(String serviceName, String cacheKey, String value, TypeReference<T> typeReference) {
|
||||
try {
|
||||
return JacksonUtil.parseObject(cachedValue, typeReference);
|
||||
return JacksonUtil.parseObject(value, typeReference);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] 解析降级缓存失败, cacheKey: {}", serviceName, cacheKey, e);
|
||||
log.warn("[{}] 解析缓存失败, cacheKey: {}", serviceName, cacheKey, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清除降级缓存
|
||||
*
|
||||
* @param serviceName 服务名称
|
||||
* @param cacheKey 缓存键
|
||||
*/
|
||||
|
||||
public void clearFallbackCache(String serviceName, String cacheKey) {
|
||||
String fullKey = buildFullCacheKey(serviceName, cacheKey);
|
||||
redisTemplate.delete(fullKey);
|
||||
log.debug("[{}] 清除降级缓存, key: {}", serviceName, fullKey);
|
||||
fallbackCache.invalidate(fullKey);
|
||||
log.debug("[{}] 清除缓存, key: {}", serviceName, fullKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量清除服务的所有降级缓存
|
||||
*
|
||||
* @param serviceName 服务名称
|
||||
*/
|
||||
|
||||
public void clearAllFallbackCache(String serviceName) {
|
||||
String pattern = buildFullCacheKey(serviceName, "*");
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
redisTemplate.delete(keys);
|
||||
log.info("[{}] 批量清除降级缓存,共删除 {} 个缓存项", serviceName, keys.size());
|
||||
String prefix = buildFullCacheKey(serviceName, "");
|
||||
List<String> keysToRemove = fallbackCache.asMap().keySet().stream()
|
||||
.filter(key -> key.startsWith(prefix))
|
||||
.collect(Collectors.toList());
|
||||
if (!keysToRemove.isEmpty()) {
|
||||
fallbackCache.invalidateAll(keysToRemove);
|
||||
log.info("[{}] 批量清除缓存,共删除 {} 项", serviceName, keysToRemove.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否有降级缓存
|
||||
*
|
||||
* @param serviceName 服务名称
|
||||
* @param cacheKey 缓存键
|
||||
* @return 是否存在降级缓存
|
||||
*/
|
||||
|
||||
public boolean hasFallbackCache(String serviceName, String cacheKey) {
|
||||
String fullKey = buildFullCacheKey(serviceName, cacheKey);
|
||||
return Boolean.TRUE.equals(redisTemplate.hasKey(fullKey));
|
||||
return fallbackCache.getIfPresent(fullKey) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务的降级缓存统计信息
|
||||
*
|
||||
* @param serviceName 服务名称
|
||||
* @return 缓存统计信息
|
||||
*/
|
||||
|
||||
public FallbackCacheStats getFallbackCacheStats(String serviceName) {
|
||||
String pattern = buildFullCacheKey(serviceName, "*");
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
int totalCount = keys != null ? keys.size() : 0;
|
||||
|
||||
String prefix = buildFullCacheKey(serviceName, "");
|
||||
long totalCount = fallbackCache.asMap().keySet().stream()
|
||||
.filter(key -> key.startsWith(prefix))
|
||||
.count();
|
||||
|
||||
return FallbackCacheStats.builder()
|
||||
.serviceName(serviceName)
|
||||
.totalCacheCount(totalCount)
|
||||
.cacheKeyPattern(pattern)
|
||||
.fallbackTtlDays(getFallbackTtl(serviceName))
|
||||
.totalCacheCount((int) totalCount)
|
||||
.cacheKeyPattern(prefix + "*")
|
||||
.cacheTtlMinutes(CACHE_TTL_MINUTES)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建完整的缓存键
|
||||
*/
|
||||
|
||||
private String buildFullCacheKey(String serviceName, String cacheKey) {
|
||||
String prefix = getFallbackPrefix(serviceName);
|
||||
return prefix + serviceName + ":" + cacheKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务的降级缓存前缀
|
||||
*/
|
||||
|
||||
private String getFallbackPrefix(String serviceName) {
|
||||
if (!integrationProperties.getFallback().isEnabled()) {
|
||||
return DEFAULT_FALLBACK_PREFIX;
|
||||
}
|
||||
|
||||
// 获取服务特定的缓存前缀
|
||||
IntegrationProperties.ServiceFallbackConfig serviceConfig = getServiceFallbackConfig(serviceName);
|
||||
if (serviceConfig != null && serviceConfig.getCachePrefix() != null) {
|
||||
return serviceConfig.getCachePrefix();
|
||||
}
|
||||
|
||||
// 使用全局配置的前缀
|
||||
return integrationProperties.getFallback().getCachePrefix();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务的降级缓存TTL
|
||||
*/
|
||||
private long getFallbackTtl(String serviceName) {
|
||||
if (!integrationProperties.getFallback().isEnabled()) {
|
||||
return DEFAULT_FALLBACK_TTL_DAYS;
|
||||
}
|
||||
|
||||
// 获取服务特定的TTL
|
||||
IntegrationProperties.ServiceFallbackConfig serviceConfig = getServiceFallbackConfig(serviceName);
|
||||
if (serviceConfig != null && serviceConfig.getTtlDays() > 0) {
|
||||
return serviceConfig.getTtlDays();
|
||||
}
|
||||
|
||||
// 使用全局配置的TTL
|
||||
return integrationProperties.getFallback().getDefaultTtlDays();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务特定的降级配置
|
||||
*/
|
||||
|
||||
private IntegrationProperties.ServiceFallbackConfig getServiceFallbackConfig(String serviceName) {
|
||||
switch (serviceName.toLowerCase()) {
|
||||
case "zt-scenic":
|
||||
@@ -376,28 +158,21 @@ public class IntegrationFallbackService {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查服务是否启用降级功能
|
||||
*/
|
||||
|
||||
public boolean isFallbackEnabled(String serviceName) {
|
||||
if (!integrationProperties.getFallback().isEnabled()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
IntegrationProperties.ServiceFallbackConfig serviceConfig = getServiceFallbackConfig(serviceName);
|
||||
return serviceConfig == null || serviceConfig.isEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
* 降级缓存统计信息
|
||||
*/
|
||||
|
||||
@lombok.Builder
|
||||
@lombok.Data
|
||||
public static class FallbackCacheStats {
|
||||
private String serviceName;
|
||||
private int totalCacheCount;
|
||||
private String cacheKeyPattern;
|
||||
private long fallbackTtlDays;
|
||||
private long cacheTtlMinutes;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user