限流机制
场景
用户量继续增长,我发现了一些异常情况。
四个月后:
- 注册用户:1500 个
- 日调用量:30 万次
问题发现
某天,服务器响应突然变慢。
我查了一下监控,发现:
# 分析 Top 用户
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'''SELECT user_id, email, COUNT(*) as calls
FROM api_logs
WHERE created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
GROUP BY user_id
ORDER BY calls DESC
LIMIT 10'''
)
for row in cursor.fetchall():
print(f"{row['user_id']}: {row['calls']} 次/小时")结果:
Top 10 用户(最近 1 小时):
User 123: 15000 次/小时 ← 异常!
User 456: 800 次/小时
User 789: 600 次/小时
User 101: 500 次/小时
...User 123 在 1 小时内调用了 15000 次!
平均每秒:15000 / 3600 ≈ 4.2 次
这个用户在疯狂调用 API,影响了其他用户。
问题分析
我查看了这个用户的使用情况:
# 查看 User 123 的调用模式
cursor.execute(
'''SELECT
DATE_FORMAT(created_at, '%Y-%m-%d %H:00:00') as hour,
COUNT(*) as calls
FROM api_logs
WHERE user_id = 123
AND created_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
GROUP BY hour
ORDER BY hour DESC'''
)结果:
User 123 的 24 小时调用趋势:
14:00: 1200 次
13:00: 1100 次
12:00: 1150 次
...
00:00: 1050 次特征:
- 24 小时不间断调用
- 每小时约 1000-1500 次
- 很有规律的调用模式
结论:这是一个自动化程序!
影响
这个用户的疯狂调用对系统的影响:
1. 性能影响
正常情况:
- QPS(每秒请求数):5
- 平均响应时间:50ms
- 服务器 CPU 使用率:30%
User 123 加入后:
- QPS:10(翻倍)
- 平均响应时间:150ms(变慢 3 倍)
- 服务器 CPU 使用率:70%2. 资源占用
User 123 占用资源:
- 外部 API 调用量:5%(15000/300000)
- 服务器 CPU 时间:30%
- 数据库连接:常驻 1-2 个
更糟糕的是:
- 挤占了正常用户的资源
- 影响了整体用户体验3. 不公平
正常用户:
- 每天 100 次调用
- 按套餐付费
User 123:
- 每天 36000 次调用
- 同样的套餐这不公平!
解决方案
我决定实现限流机制。
限流目标
- 保护系统稳定性
- 保证公平性
- 区分不同套餐
限流策略
| 套餐 | 每日限额 | 每秒限额 | 突发限额 |
|---|---|---|---|
| 免费版 | 1000 次 | 1 次/秒 | 10 次 |
| 基础版 | 10000 次 | 10 次/秒 | 100 次 |
| 专业版 | 100000 次 | 100 次/秒 | 1000 次 |
限流算法
算法 1:固定窗口(最简单)
from datetime import datetime, timedelta
def is_rate_limited_fixed_window(user_id, limit_per_second):
"""固定窗口限流"""
current_time = datetime.now()
window_start = current_time.replace(microsecond=0)
cache_key = f'rate_limit:{user_id}:{window_start.strftime("%Y%m%d%H%M%S")}'
# 获取当前计数
current_count = redis_client.get(cache_key) or 0
if int(current_count) >= limit_per_second:
return True # 被限流
# 增加计数
redis_client.incr(cache_key)
redis_client.expire(cache_key, 1) # 1 秒后过期
return False # 未被限流问题:边界问题
时间轴:
0.9 秒:1 个请求
1.0 秒:1 个请求(新窗口)
1.1 秒:1 个请求
如果限制是 1 次/秒:
- 0.9 秒:允许
- 1.0 秒:允许(新窗口)← 两个请求只相隔 0.1 秒!算法 2:滑动窗口(更平滑)
def is_rate_limited_sliding_window(user_id, limit_per_second, window_size=1):
"""滑动窗口限流"""
current_time = time.time()
# 获取最近的时间戳列表
cache_key = f'rate_limit:{user_id}'
# 清理过期的请求记录
redis_client.zremrangebyscore(
cache_key,
0,
current_time - window_size
)
# 获取当前窗口内的请求数
current_count = redis_client.zcard(cache_key)
if current_count >= limit_per_second:
return True # 被限流
# 记录这次请求
redis_client.zadd(cache_key, {str(current_time): current_time})
redis_client.expire(cache_key, window_size + 1)
return False # 未被限流优势:更平滑的限流
时间轴(1 次/秒的限流):
0.9 秒:记录请求 → 窗口内有 1 个请求
1.0 秒:清理 0 秒前的记录 → 窗口内有 1 个请求
1.0 秒:尝试记录 → 被限流!
1.1 秒:清理 0.1 秒前的记录 → 窗口内有 1 个请求
1.1 秒:尝试记录 → 被限流!算法 3:令牌桶(允许突发)
def is_rate_limited_token_bucket(user_id, rate, capacity):
"""令牌桶限流
Args:
user_id: 用户 ID
rate: 每秒生成令牌数
capacity: 桶容量(最大令牌数)
"""
current_time = time.time()
cache_key = f'token_bucket:{user_id}'
# 获取当前状态
pipe = redis_client.pipeline()
pipe.hget(cache_key, 'tokens')
pipe.hget(cache_key, 'last_time')
tokens, last_time = pipe.execute()
if tokens is None:
# 首次使用,桶满了
tokens = capacity
last_time = current_time
else:
tokens = float(tokens)
last_time = float(last_time)
# 计算新增令牌数
time_passed = current_time - last_time
new_tokens = time_passed * rate
# 更新令牌数(不超过容量)
tokens = min(tokens + new_tokens, capacity)
# 检查是否有足够令牌
if tokens < 1:
# 更新状态
redis_client.hset(cache_key, 'tokens', tokens)
redis_client.hset(cache_key, 'last_time', current_time)
redis_client.expire(cache_key, 3600)
return True # 被限流
# 消耗一个令牌
tokens -= 1
# 更新状态
redis_client.hset(cache_key, 'tokens', tokens)
redis_client.hset(cache_key, 'last_time', current_time)
redis_client.expire(cache_key, 3600)
return False # 未被限流优势:允许突发流量
场景:容量 10,速率 1/秒
初始状态:桶里有 10 个令牌
0 秒:连续请求 10 次
- 桶里有 10 个令牌 → 全部允许
- 桶里剩余 0 个令牌
1 秒:请求 1 次
- 生成 1 个令牌 → 桶里有 1 个令牌
- 消耗 1 个令牌 → 桶里剩余 0 个令牌
2 秒:请求 1 次
- 生成 1 个令牌 → 桶里有 1 个令牌
- 消耗 1 个令牌 → 桶里剩余 0 个令牌
结果:可以承受短暂的突发流量!实现限流中间件
from functools import wraps
def rate_limit_check(user):
"""检查用户是否被限流"""
# 根据套餐选择限流参数
if user['plan'] == 'free':
rate = 1 # 每秒 1 个令牌
capacity = 10 # 桶容量 10
elif user['plan'] == 'basic':
rate = 10
capacity = 100
elif user['plan'] == 'pro':
rate = 100
capacity = 1000
else:
rate = 1
capacity = 10
# 检查令牌桶
if is_rate_limited_token_bucket(user['id'], rate, capacity):
return True
return False
def require_api_key_with_rate_limit(f):
"""带限流的 API Key 验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
# 验证 API Key
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API Key required'}), 401
user = db.execute(
'SELECT * FROM users WHERE api_key = ?',
(api_key,)
)
if not user:
return jsonify({'error': 'Invalid API Key'}), 401
# 检查限流
if rate_limit_check(user):
return jsonify({
'error': 'Rate limit exceeded',
'message': f'Rate limit: {user["plan"]} plan',
'retry_after': 1 # 秒
}), 429
request.user = user
return f(*args, **kwargs)
return decorated_function
@app.route('/api/weather')
@require_api_key_with_rate_limit
def get_weather():
# ... 业务逻辑
pass效果验证
上线后,我观察了一周:
User 123 的调用变化
限流前:
- 每小时:15000 次
- 每秒:4.2 次
限流后(免费版 1 次/秒):
- 每小时:3600 次
- 每秒:1 次
- 被限流次数:11400 次/小时整体性能提升
限流前:
- QPS:10
- 响应时间:150ms
- CPU 使用率:70%
限流后:
- QPS:5
- 响应时间:50ms
- CPU 使用率:30%用户反馈
正常用户:
- "API 速度快了很多" ✅
User 123(被限流的用户):
- 收到通知,告知需要升级套餐
- 部分用户升级到付费版 ✅
- 部分用户减少调用频率 ✅监控和告警
def check_rate_limit_alerts():
"""检查限流告警"""
# 统计被限流最多的用户
blocked_users = redis_client.zrevrange(
'rate_limit_blocked',
0, 9,
withscores=True
)
for user_id, score in blocked_users:
# 如果某个用户 1 小时内被限流超过 100 次
if score > 100:
# 发送通知
send_alert(
f'User {user_id} is frequently rate limited: {int(score)} times/hour'
)
# 检查是否是异常行为
user = db.execute(
'SELECT * FROM users WHERE id = ?',
(int(user_id),)
)
# 如果是免费用户但调用频繁,建议升级
if user['plan'] == 'free' and score > 500:
send_upgrade_suggestion(user['email'])本节小结
✅ 完成的工作:
- 实现了令牌桶限流算法
- 根据套餐设置不同的限流参数
- 限制了用户的调用速率
- 保护了系统稳定性
✅ 效果:
- 系统性能恢复正常
- 用户体验提升
- 部分用户升级到付费版
⚠️ 新的问题:
- 单机服务器还是扛不住
- 需要多台服务器分流
🎯 下一步: 用户量继续增长,单机开始吃力,我需要负载均衡。
(下一节:负载均衡)
