缓存雪崩
那个可怕的夜晚
周五凌晨 2 点,睡得正香的我被电话惊醒。
电话那头是运维同事焦急的声音:
"快起来!系统崩了!"
"外部 API 调用失败率 100%,触发限流!"
"Redis 好像挂了,连接不上!"我打开电脑,看到的是一片红色警报:
🔴 严重:Redis 集群无响应
🔴 严重:外部 API 调用失败率 85%
🔴 严重:API 错误率 85%
🔴 严重:响应时间超过 30 秒这就是缓存雪崩——缓存系统大规模失效,所有请求直接涌向外部 API。
什么是缓存雪崩?
缓存雪崩流程
⏰ 场景一:集中过期
14:00 批量写入缓存
key_001 TTL: 3600s
key_002 TTL: 3600s
key_003 TTL: 3600s
...
key_999 TTL: 3600s
⚠️ 所有 key 都在同一时间过期!
15:00 集体过期!
key_001 ❌ 过期
key_002 ❌ 过期
key_003 ❌ 过期
...
key_999 ❌ 过期
💥 10000 个 key 同时失效!
15:01 外部 API 崩溃
🌊 请求洪水
🌐 外部 API 过载
CPU: 100%
🔴 系统雪崩!
💀 场景二:Redis 宕机
✅ 正常状态
👤 用户
→🗄️ Redis
→🌐 外部 API
缓存命中率 95%
响应时间 50ms
💥 Redis 宕机
👤 用户
→❌ Redis
⇨🌐 外部 API 🌊 全部请求涌向这里!
缓存命中率 0%
响应时间 5000ms+
API 限流状态 已触发
⚖️ 三种缓存问题对比
🕳️ 缓存穿透
目标: 不存在的数据
范围: 大量不同 key
危害: 中等
🔨 缓存击穿
目标: 热点 key
范围: 单个 key
危害: 较高
🏔️ 缓存雪崩
目标: 大面积失效
范围: 全部 key
危害: 最严重
缓存雪崩是指:
- 大量缓存 key 在同一时间过期
- 或者缓存服务(如 Redis)宕机
- 导致大量请求同时访问外部 API
- 外部 API 瞬间压力激增,可能触发限流或崩溃
关键特征:
- 大面积失效(不是单个 key)
- 通常是系统级问题
- 后果最严重(可能导致整个系统崩溃)
雪崩的两种触发场景
场景一:集中过期
问题设置:
- 所有缓存 key 都设置 1 小时过期
- 都在整点写入缓存
时间线:
14:00 - 写入 10000 个缓存 key(过期时间 15:00)
14:30 - 系统正常,缓存命中率 95%
15:00 - 10000 个 key 同时过期 ❌
15:01 - 所有请求涌向外部 API
15:02 - 外部 API 触发限流问题根源:
- 使用固定的过期时间(如 3600 秒)
- 没有考虑过期时间的叠加效应
场景二:Redis 宕机
时间线:
T0: Redis 主节点故障
T1: 从节点未能自动切换
T2: 应用无法连接缓存
T3: 所有请求直接访问外部 API
T4: 外部 API 触发限流
T5: 系统全面崩溃问题根源:
- Redis 没有高可用架构
- 缺少故障转移机制
- 没有降级方案
雪崩 vs 击穿 vs 穿透
这三个概念经常被混淆,我们来系统对比:
| 对比项 | 缓存穿透 | 缓存击穿 | 缓存雪崩 |
|---|---|---|---|
| 影响范围 | 单个/少量 key | 单个热点 key | 大面积/全部 key |
| 数据存在性 | 数据不存在 | 数据存在 | 数据存在 |
| 触发原因 | 恶意攻击 | 热点 key 过期 | 集中过期/服务宕机 |
| 并发量级 | 中等 | 高并发 | 超高并发 |
| 危害程度 | ⭐⭐ 中等 | ⭐⭐⭐ 较高 | ⭐⭐⭐⭐⭐ 最严重 |
形象比喻:
- 穿透 = 用假钥匙捅门(门后没人,但一直捅)
- 击穿 = 很多人同时撞一扇门(门后有人,但扛不住)
- 雪崩 = 整栋楼的墙塌了(全部暴露)
解决方案一:随机过期时间
核心思路:
- 在固定过期时间基础上增加随机值
- 让 key 分散过期,避免同时失效
import random
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def set_cache_with_random_ttl(key, value, base_ttl=3600, random_range=300):
"""
设置缓存,带随机过期时间
参数:
- key: 缓存键
- value: 缓存值
- base_ttl: 基础过期时间(秒)
- random_range: 随机波动范围(秒)
实际过期时间 = base_ttl + random(-random_range, +random_range)
"""
# 计算随机过期时间
random_offset = random.randint(-random_range, random_range)
actual_ttl = base_ttl + random_offset
# 确保过期时间为正数
actual_ttl = max(60, actual_ttl) # 至少 60 秒
redis_client.setex(key, actual_ttl, value)
return actual_ttl
# 使用示例
def get_weather(city):
cache_key = f"weather:{city}"
# 查缓存
cached = redis_client.get(cache_key)
if cached:
return cached
# 查外部 API
data = external_api.get_weather(city)
# 写缓存,随机过期时间(1 小时 ± 5 分钟)
actual_ttl = set_cache_with_random_ttl(
cache_key,
data,
base_ttl=3600, # 基础 1 小时
random_range=300 # 随机±5 分钟
)
print(f"缓存 {cache_key} 将在 {actual_ttl} 秒后过期")
return data效果对比:
固定过期时间:
14:00 写入 10000 个 key,都设置 3600 秒过期
15:00 10000 个 key 同时过期 ❌
随机过期时间:
14:00 写入 10000 个 key,基础 3600 秒,随机±300 秒
实际过期时间分布:
- 59 分过期:约 800 个 key
- 60 分过期:约 3000 个 key
- 61 分过期:约 3000 个 key
- 62 分过期:约 800 个 key
- ...分散在 55-65 分钟之间 ✅
结果:没有明显的过期高峰解决方案二:高可用架构(必须)
Redis 高可用架构
👑 方案一:Redis Sentinel(哨兵模式)
应用层
🖥️ 应用服务器
哨兵层
👁️ Sentinel 1 监控中
👁️ Sentinel 2 监控中
👁️ Sentinel 3 监控中
Redis 数据层
👑 Master
192.168.1.100:6379 写操作
📋 Slave 1
192.168.1.101:6379 读操作 / 热备
📋 Slave 2
192.168.1.102:6379 读操作 / 热备
🔄 自动故障转移流程
1 Sentinel 检测到 Master 宕机
→2 Sentinel 投票选举
→3 选出一个 Slave 晋升为 Master
→4 通知应用层新 Master 地址
🔗 方案二:Redis Cluster(集群模式)
总槽位数 16384
分片数 3
每主节点 ≈5461 槽
🔷 分片 1
主节点
Slot 0-5460 Node 1:6379
从节点
热备 Node 4:6379
🔶 分片 2
主节点
Slot 5461-10922 Node 2:6379
从节点
热备 Node 5:6379
🔹 分片 3
主节点
Slot 10923-16383 Node 3:6379
从节点
热备 Node 6:6379
⚖️ 方案对比
特性
Sentinel 模式
Cluster 模式
架构复杂度
⭐⭐ 简单
⭐⭐⭐⭐ 复杂
水平扩展
❌ 不支持
✅ 支持
读写分离
✅ 支持
✅ 支持
故障转移
✅ 自动
✅ 自动
推荐场景
小规模、简单部署
大规模、高并发
核心思路:
- 部署 Redis 集群
- 配置主从复制
- 启用自动故障转移
Redis Sentinel(哨兵模式)
# redis-sentinel.conf 配置示例
# 主节点监控
sentinel monitor mymaster 192.168.1.100 6379 2
# 故障判定
sentinel down-after-milliseconds mymaster 5000 # 5 秒无响应判定为宕机
sentinel failover-timeout mymaster 60000 # 故障转移超时 60 秒
sentinel parallel-syncs mymaster 1 # 并行同步的从节点数
# 通知脚本(可选)
sentinel notification-script mymaster /var/redis/notify.sh架构拓扑:
┌─────────────────┐
│ Sentinel 1 │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Sentinel 2 │ │ Sentinel 3 │ │ 应用层 │
└───────────────┘ └───────────────┘ └───────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Master │────────▶│ Slave 1 │ │ Slave 2 │
│ (主节点) │ 复制 │ (从节点) │ │ (从节点) │
└─────────┘ └─────────┘ └─────────┘
│
▼ 故障转移
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Slave 1│────────▶│ Master │ │ Slave 2 │
│ 晋升为主 │ 复制 │ (新主) │ │ │
└─────────┘ └─────────┘ └─────────┘Redis Cluster(集群模式)
# Redis Cluster 配置
# 启用集群模式
cluster-enabled yes
# 集群配置文件
cluster-config-file nodes.conf
# 节点超时时间(毫秒)
cluster-node-timeout 15000
# 从节点数量
cluster-replica-validity-factor 10集群拓扑:
应用层
│
├───┬───┬───┐
│ │ │ │
▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Node 1 │ │Node 2 │ │Node 3 │ 主节点(分片)
│Slot │ │Slot │ │Slot │
│0-5460 │ │5461- │ │10923- │
│ │ │10922 │ │16383 │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Slave 1│ │Slave 2│ │Slave 3│ 从节点
└───────┘ └───────┘ └───────┘解决方案三:服务降级(兜底方案)
核心思路:
- 当缓存不可用时,自动降级
- 限制外部 API 访问,保护核心功能
- 返回缓存数据或默认值
import time
from datetime import datetime, timedelta
from functools import wraps
class CircuitBreaker:
"""熔断器实现"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold # 失败阈值
self.recovery_timeout = recovery_timeout # 恢复超时(秒)
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
# 熔断器打开,拒绝请求
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
# 尝试半开状态
self.state = 'HALF_OPEN'
else:
raise Exception('服务降级:熔断器已打开')
try:
result = func(*args, **kwargs)
self.failure_count = 0
self.state = 'CLOSED'
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
raise e
# 全局熔断器实例
cache_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
api_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
# 本地缓存(Redis 宕机时的最后防线)
local_cache = {}
local_cache_ttl = {}
def get_weather_fallback(city):
"""
带降级的天气查询
降级策略:Redis → 本地缓存 → 外部 API → 默认值
"""
cache_key = f"weather:{city}"
# 尝试 1:Redis 缓存
try:
cached = cache_breaker.call(redis_client.get, cache_key)
if cached:
return cached
except Exception:
print("Redis 不可用,尝试降级方案")
# 尝试 2:本地缓存
if cache_key in local_cache:
# 检查本地缓存是否过期
if local_cache_ttl.get(cache_key, 0) > time.time():
print("使用本地缓存")
return local_cache[cache_key]
# 尝试 3:外部 API(带熔断保护)
try:
data = api_breaker.call(external_api.get_weather, city)
if data:
# 写入本地缓存
local_cache[cache_key] = data
local_cache_ttl[cache_key] = time.time() + 300 # 本地缓存 5 分钟
return data
except Exception:
print("外部 API 不可用,返回默认值")
# 尝试 4:返回默认值(最后防线)
default_data = {
'city': city,
'temperature': 20,
'condition': '未知',
'humidity': 50,
'fallback': True # 标记这是降级返回的数据
}
return default_data降级层次:
┌─────────────────────────────────────┐
│ Level 1: Redis 缓存 │ ← 正常情况
├─────────────────────────────────────┤
│ Level 2: 本地缓存(进程内) │ ← Redis 宕机
├─────────────────────────────────────┤
│ Level 3: 外部 API │ ← 本地缓存没有
├─────────────────────────────────────┤
│ Level 4: 默认值/静态数据 │ ← 外部 API 不可用
└─────────────────────────────────────┘完整的防护方案
import redis
import random
import time
from functools import wraps
from datetime import datetime, timedelta
# Redis 客户端(带连接池)
redis_pool = redis.ConnectionPool(
host='redis-sentinel',
port=26379,
sentinel_manager='mymaster',
max_connections=100
)
redis_client = redis.Redis(connection_pool=redis_pool)
# 本地缓存
local_cache = {}
local_cache_ttl = {}
# 熔断器
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED'
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception('服务降级')
try:
result = func(*args, **kwargs)
self.failure_count = 0
self.state = 'CLOSED'
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
raise e
cache_breaker = CircuitBreaker()
def cache_with_avalanche_protection(base_ttl=3600, random_range=300):
"""
带缓存雪崩防护的装饰器
- base_ttl: 基础过期时间
- random_range: 随机波动范围
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{func.__name__}:{args}:{kwargs}"
# 1. 尝试 Redis 缓存
try:
cached = cache_breaker.call(redis_client.get, cache_key)
if cached:
return cached
except Exception:
pass # Redis 不可用,继续降级
# 2. 尝试本地缓存
if cache_key in local_cache:
if local_cache_ttl.get(cache_key, 0) > time.time():
return local_cache[cache_key]
# 3. 调用外部 API
try:
result = func(*args, **kwargs)
# 4. 写入 Redis(带随机过期时间)
try:
actual_ttl = base_ttl + random.randint(-random_range, random_range)
actual_ttl = max(60, actual_ttl)
cache_breaker.call(
redis_client.setex,
cache_key,
actual_ttl,
result
)
except Exception:
pass # Redis 写入失败,不影响返回
# 5. 写入本地缓存
local_cache[cache_key] = result
local_cache_ttl[cache_key] = time.time() + min(300, actual_ttl)
return result
except Exception:
# 6. 外部 API 也失败,返回本地缓存(如果有的话)
if cache_key in local_cache:
return local_cache[cache_key]
raise
return wrapper
return decorator
# 使用示例
@cache_with_avalanche_protection(base_ttl=3600, random_range=300)
def get_weather(city):
return external_api.get_weather(city)效果验证
实施完整防护方案后的系统指标:
防护效果对比
| 指标 | 无防护 | 随机过期 | + 高可用 | + 降级 |
|---|---|---|---|---|
| 集中过期风险 | 高 | 低 | 低 | 低 |
| Redis 宕机影响 | 100% | 100% | 0% | 0% |
| 外部 API 保护 | 无 | 部分 | 强 | 最强 |
| 可用性 | 95% | 95% | 99.9% | 99.99% |
故障演练结果
故障场景测试:
1. 模拟 Redis 主节点宕机
- 故障检测时间:5 秒
- 自动切换时间:8 秒
- 影响请求:0.1%
- 结果:通过 ✅
2. 模拟 10000 个 key 同时过期
- 外部 API 峰值:正常值 120%
- 响应时间:增加 50ms
- 结果:通过 ✅
3. 模拟外部 API 不可用
- 降级响应时间:< 100ms
- 返回本地缓存数据
- 结果:通过 ✅深度思考
随机范围设置
场景:10000 个 key,基础过期时间 1 小时
随机范围±1 分钟:
❌ 分散效果不够,仍然有集中过期风险
随机范围±5 分钟(推荐):
✅ 分散到 10 分钟窗口,每个 key 独立
✅ 数据新鲜度可控(最多晚 5 分钟)
随机范围±30 分钟:
❌ 分散效果好,但数据可能过时太久
❌ 用户可能看到 30 分钟前的数据
推荐:基础时间的 5%-10%本地缓存的管理
# 问题:本地缓存无限增长会内存溢出
# 解决方案 1:设置最大容量
from collections import OrderedDict
class LRUCache:
def __init__(self, max_size=10000):
self.cache = OrderedDict()
self.max_size = max_size
def get(self, key):
if key in self.cache:
self.cache.move_to_end(key)
return self.cache[key]
return None
def set(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
else:
if len(self.cache) >= self.max_size:
# 删除最旧的
self.cache.popitem(last=False)
self.cache[key] = value
# 解决方案 2:使用第三方库
from cachetools import TTLCache
# 最多 10000 个条目,TTL 5 分钟
local_cache = TTLCache(maxsize=10000, ttl=300)小结
缓存雪崩的本质:大面积缓存失效或缓存服务宕机,导致外部 API 被压垮
防护核心思想:
- 随机过期时间 —— 避免集中过期
- 高可用架构 —— 避免单点故障
- 服务降级 —— 最后防线
最佳实践:
- 所有缓存 key 使用随机过期时间
- 部署 Redis Sentinel 或 Cluster
- 实现多层降级策略
- 定期故障演练
当前技术架构
每个请求都直接调用外部 API,响应慢
客户端
用户请求 200 个开发者
应用服务
应用服务器 处理请求
缓存层 / 外部服务
外部天气 API 响应时间 2 秒
引入 Redis 缓存,大幅降低响应时间
客户端
用户请求 高并发访问
应用服务
应用服务器 处理请求
缓存层 / 外部服务
Redis 缓存 1 小时过期
外部天气 API 响应时间 2 秒
增加空值缓存,防止缓存穿透
客户端
用户请求 包含恶意请求
应用服务
应用服务器 参数校验
缓存层 / 外部服务
Redis 缓存 包含空值缓存
外部天气 API 有调用限制
使用布隆过滤器提前过滤无效请求
客户端
用户请求 包含恶意请求
应用服务
应用服务器 布隆过滤器校验
缓存层 / 外部服务
Redis 缓存 包含空值缓存
外部天气 API 有调用限制
使用互斥锁防止缓存击穿
客户端
用户请求 高并发访问
应用服务
应用服务器 互斥锁控制
缓存层 / 外部服务
Redis 缓存 热点 key 防护
外部天气 API 有调用限制
熔断器 + 降级策略应对缓存雪崩
客户端
用户请求 高并发访问
应用服务
应用服务器 熔断器 + 降级
缓存层 / 外部服务
Redis Sentinel 主从高可用
外部天气 API 有调用限制
多地域部署的高可用 API 平台
客户端
用户请求 10 万用户
应用服务
应用服务器集群 26 台,3 地域
缓存层 / 外部服务
Redis 集群 6 主 6 从
MySQL 主从 1 主 5 从
消息队列 RabbitMQ 3 台
课后练习
练习 1
缓存雪崩的两种触发场景是什么?
参考答案 (2 个标签)
缓存雪崩 基础概念
答案:
场景一:集中过期
- 大量缓存 key 在同一时间过期
- 原因:使用固定过期时间,且批量写入
- 例:10000 个 key 都在 14:00 写入,都设置 1 小时过期,15:00 同时失效
场景二:缓存服务宕机
- Redis 服务器故障、网络中断、机房断电等
- 导致所有缓存无法访问
- 所有请求直接访问外部 API
区别:
- 集中过期可以预防(随机过期时间)
- 服务宕机需要高可用架构和降级方案
练习 2
为什么缓存过期时间要加上随机值?随机范围如何设置?
参考答案 (2 个标签)
缓存雪崩 随机过期
答案:
加随机值的原因:
- 避免大量 key 同时过期
- 让过期时间分散,形成平缓的过期曲线
- 防止外部 API 在某一时刻承受过大压力
随机范围设置原则:
推荐:基础过期时间的 5%-10%
示例:
- 基础过期 1 小时(3600 秒)
- 随机范围±5 分钟(±300 秒)
- 实际过期时间:55-65 分钟考虑因素:
- 分散效果:范围越大,分散越好
- 数据新鲜度:范围越大,数据可能越旧
- 业务容忍度:根据业务可接受的旧数据程度调整
练习 3
请设计一个完整的缓存雪崩防护方案,包括代码实现。
参考答案 (3 个标签)
缓存雪崩 实战编程 高可用
参考答案:
import redis
import random
import time
from functools import wraps
# Redis Sentinel 高可用配置
sentinel = redis.Sentinel([
('192.168.1.100', 26379),
('192.168.1.101', 26379),
('192.168.1.102', 26379),
], socket_timeout=0.1)
# 主节点连接
redis_master = sentinel.master_for('mymaster', socket_timeout=0.1)
# 从节点连接(读操作)
redis_slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
# 本地缓存(最后防线)
local_cache = {}
local_cache_ttl = {}
def get_weather_with_protection(city):
"""
带雪崩防护的天气查询
防护层次:Redis 从节点 → Redis 主节点 → 本地缓存 → 外部 API
"""
cache_key = f"weather:{city}"
# Layer 1: 尝试 Redis 从节点(读操作优先从从节点)
try:
cached = redis_slave.get(cache_key)
if cached:
return cached
except Exception:
pass
# Layer 2: 尝试 Redis 主节点
try:
cached = redis_master.get(cache_key)
if cached:
return cached
except Exception:
pass
# Layer 3: 本地缓存
if cache_key in local_cache:
if local_cache_ttl.get(cache_key, 0) > time.time():
return local_cache[cache_key]
# Layer 4: 外部 API
data = external_api.get_weather(city)
# 写入缓存(随机过期时间)
base_ttl = 3600
random_ttl = base_ttl + random.randint(-300, 300)
try:
redis_master.setex(cache_key, random_ttl, data)
except Exception:
pass # Redis 写入失败不影响返回
# 写入本地缓存
local_cache[cache_key] = data
local_cache_ttl[cache_key] = time.time() + 300
return data防护层次:
- Redis 从节点读(减轻主节点压力)
- Redis 主节点(从节点不可用时)
- 本地缓存(Redis 集群不可用时)
- 外部 API(最后的数据来源)
关键点:
- 使用 Sentinel 实现自动故障转移
- 读写分离(从节点处理读请求)
- 随机过期时间避免集中失效
- 本地缓存作为最后防线
