导航菜单

限流机制

场景

用户量继续增长,我发现了一些异常情况。

四个月后:

  • 注册用户:1500 个
  • 日调用量:30 万次

问题发现

某天,服务器响应突然变慢。

我查了一下监控,发现:

# 分析 Top 用户
with get_db_connection() as conn:
    cursor = conn.cursor()
    cursor.execute(
        '''SELECT user_id, email, COUNT(*) as calls
           FROM api_logs
           WHERE created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
           GROUP BY user_id
           ORDER BY calls DESC
           LIMIT 10'''
    )

    for row in cursor.fetchall():
        print(f"{row['user_id']}: {row['calls']} 次/小时")

结果:

Top 10 用户(最近 1 小时):
User 123: 15000 次/小时  ← 异常!
User 456: 800 次/小时
User 789: 600 次/小时
User 101: 500 次/小时
...

User 123 在 1 小时内调用了 15000 次!

平均每秒:15000 / 3600 ≈ 4.2 次

这个用户在疯狂调用 API,影响了其他用户。

问题分析

我查看了这个用户的使用情况:

# 查看 User 123 的调用模式
cursor.execute(
    '''SELECT
       DATE_FORMAT(created_at, '%Y-%m-%d %H:00:00') as hour,
       COUNT(*) as calls
       FROM api_logs
       WHERE user_id = 123
       AND created_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
       GROUP BY hour
       ORDER BY hour DESC'''
)

结果:

User 123 的 24 小时调用趋势:
14:00: 1200 次
13:00: 1100 次
12:00: 1150 次
...
00:00: 1050 次

特征:

  • 24 小时不间断调用
  • 每小时约 1000-1500 次
  • 很有规律的调用模式

结论:这是一个自动化程序!

影响

这个用户的疯狂调用对系统的影响:

1. 性能影响

正常情况:
- QPS(每秒请求数):5
- 平均响应时间:50ms
- 服务器 CPU 使用率:30%

User 123 加入后:
- QPS:10(翻倍)
- 平均响应时间:150ms(变慢 3 倍)
- 服务器 CPU 使用率:70%

2. 资源占用

User 123 占用资源:
- 外部 API 调用量:5%(15000/300000)
- 服务器 CPU 时间:30%
- 数据库连接:常驻 1-2 个

更糟糕的是:
- 挤占了正常用户的资源
- 影响了整体用户体验

3. 不公平

正常用户:
- 每天 100 次调用
- 按套餐付费

User 123:
- 每天 36000 次调用
- 同样的套餐

这不公平!

解决方案

我决定实现限流机制

限流目标

  1. 保护系统稳定性
  2. 保证公平性
  3. 区分不同套餐

限流策略

套餐每日限额每秒限额突发限额
免费版1000 次1 次/秒10 次
基础版10000 次10 次/秒100 次
专业版100000 次100 次/秒1000 次

限流算法

算法 1:固定窗口(最简单)

from datetime import datetime, timedelta

def is_rate_limited_fixed_window(user_id, limit_per_second):
    """固定窗口限流"""

    current_time = datetime.now()
    window_start = current_time.replace(microsecond=0)

    cache_key = f'rate_limit:{user_id}:{window_start.strftime("%Y%m%d%H%M%S")}'

    # 获取当前计数
    current_count = redis_client.get(cache_key) or 0

    if int(current_count) >= limit_per_second:
        return True  # 被限流

    # 增加计数
    redis_client.incr(cache_key)
    redis_client.expire(cache_key, 1)  # 1 秒后过期

    return False  # 未被限流

问题:边界问题

时间轴:
0.9 秒:1 个请求
1.0 秒:1 个请求(新窗口)
1.1 秒:1 个请求

如果限制是 1 次/秒:
- 0.9 秒:允许
- 1.0 秒:允许(新窗口)← 两个请求只相隔 0.1 秒!

算法 2:滑动窗口(更平滑)

def is_rate_limited_sliding_window(user_id, limit_per_second, window_size=1):
    """滑动窗口限流"""

    current_time = time.time()

    # 获取最近的时间戳列表
    cache_key = f'rate_limit:{user_id}'

    # 清理过期的请求记录
    redis_client.zremrangebyscore(
        cache_key,
        0,
        current_time - window_size
    )

    # 获取当前窗口内的请求数
    current_count = redis_client.zcard(cache_key)

    if current_count >= limit_per_second:
        return True  # 被限流

    # 记录这次请求
    redis_client.zadd(cache_key, {str(current_time): current_time})
    redis_client.expire(cache_key, window_size + 1)

    return False  # 未被限流

优势:更平滑的限流

时间轴(1 次/秒的限流):
0.9 秒:记录请求 → 窗口内有 1 个请求
1.0 秒:清理 0 秒前的记录 → 窗口内有 1 个请求
1.0 秒:尝试记录 → 被限流!
1.1 秒:清理 0.1 秒前的记录 → 窗口内有 1 个请求
1.1 秒:尝试记录 → 被限流!

算法 3:令牌桶(允许突发)

def is_rate_limited_token_bucket(user_id, rate, capacity):
    """令牌桶限流

    Args:
        user_id: 用户 ID
        rate: 每秒生成令牌数
        capacity: 桶容量(最大令牌数)
    """

    current_time = time.time()
    cache_key = f'token_bucket:{user_id}'

    # 获取当前状态
    pipe = redis_client.pipeline()
    pipe.hget(cache_key, 'tokens')
    pipe.hget(cache_key, 'last_time')
    tokens, last_time = pipe.execute()

    if tokens is None:
        # 首次使用,桶满了
        tokens = capacity
        last_time = current_time
    else:
        tokens = float(tokens)
        last_time = float(last_time)

    # 计算新增令牌数
    time_passed = current_time - last_time
    new_tokens = time_passed * rate

    # 更新令牌数(不超过容量)
    tokens = min(tokens + new_tokens, capacity)

    # 检查是否有足够令牌
    if tokens < 1:
        # 更新状态
        redis_client.hset(cache_key, 'tokens', tokens)
        redis_client.hset(cache_key, 'last_time', current_time)
        redis_client.expire(cache_key, 3600)

        return True  # 被限流

    # 消耗一个令牌
    tokens -= 1

    # 更新状态
    redis_client.hset(cache_key, 'tokens', tokens)
    redis_client.hset(cache_key, 'last_time', current_time)
    redis_client.expire(cache_key, 3600)

    return False  # 未被限流

优势:允许突发流量

场景:容量 10,速率 1/秒

初始状态:桶里有 10 个令牌

0 秒:连续请求 10 次
- 桶里有 10 个令牌 → 全部允许
- 桶里剩余 0 个令牌

1 秒:请求 1 次
- 生成 1 个令牌 → 桶里有 1 个令牌
- 消耗 1 个令牌 → 桶里剩余 0 个令牌

2 秒:请求 1 次
- 生成 1 个令牌 → 桶里有 1 个令牌
- 消耗 1 个令牌 → 桶里剩余 0 个令牌

结果:可以承受短暂的突发流量!

实现限流中间件

from functools import wraps

def rate_limit_check(user):
    """检查用户是否被限流"""

    # 根据套餐选择限流参数
    if user['plan'] == 'free':
        rate = 1  # 每秒 1 个令牌
        capacity = 10  # 桶容量 10
    elif user['plan'] == 'basic':
        rate = 10
        capacity = 100
    elif user['plan'] == 'pro':
        rate = 100
        capacity = 1000
    else:
        rate = 1
        capacity = 10

    # 检查令牌桶
    if is_rate_limited_token_bucket(user['id'], rate, capacity):
        return True

    return False

def require_api_key_with_rate_limit(f):
    """带限流的 API Key 验证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        # 验证 API Key
        api_key = request.headers.get('X-API-Key')

        if not api_key:
            return jsonify({'error': 'API Key required'}), 401

        user = db.execute(
            'SELECT * FROM users WHERE api_key = ?',
            (api_key,)
        )

        if not user:
            return jsonify({'error': 'Invalid API Key'}), 401

        # 检查限流
        if rate_limit_check(user):
            return jsonify({
                'error': 'Rate limit exceeded',
                'message': f'Rate limit: {user["plan"]} plan',
                'retry_after': 1  # 秒
            }), 429

        request.user = user
        return f(*args, **kwargs)

    return decorated_function

@app.route('/api/weather')
@require_api_key_with_rate_limit
def get_weather():
    # ... 业务逻辑
    pass

效果验证

上线后,我观察了一周:

User 123 的调用变化

限流前:
- 每小时:15000 次
- 每秒:4.2 次

限流后(免费版 1 次/秒):
- 每小时:3600 次
- 每秒:1 次
- 被限流次数:11400 次/小时

整体性能提升

限流前:
- QPS:10
- 响应时间:150ms
- CPU 使用率:70%

限流后:
- QPS:5
- 响应时间:50ms
- CPU 使用率:30%

用户反馈

正常用户:
- "API 速度快了很多" ✅

User 123(被限流的用户):
- 收到通知,告知需要升级套餐
- 部分用户升级到付费版 ✅
- 部分用户减少调用频率 ✅

监控和告警

def check_rate_limit_alerts():
    """检查限流告警"""

    # 统计被限流最多的用户
    blocked_users = redis_client.zrevrange(
        'rate_limit_blocked',
        0, 9,
        withscores=True
    )

    for user_id, score in blocked_users:
        # 如果某个用户 1 小时内被限流超过 100 次
        if score > 100:
            # 发送通知
            send_alert(
                f'User {user_id} is frequently rate limited: {int(score)} times/hour'
            )

            # 检查是否是异常行为
            user = db.execute(
                'SELECT * FROM users WHERE id = ?',
                (int(user_id),)
            )

            # 如果是免费用户但调用频繁,建议升级
            if user['plan'] == 'free' and score > 500:
                send_upgrade_suggestion(user['email'])

本节小结

✅ 完成的工作:

  • 实现了令牌桶限流算法
  • 根据套餐设置不同的限流参数
  • 限制了用户的调用速率
  • 保护了系统稳定性

✅ 效果:

  • 系统性能恢复正常
  • 用户体验提升
  • 部分用户升级到付费版

⚠️ 新的问题:

  • 单机服务器还是扛不住
  • 需要多台服务器分流

🎯 下一步: 用户量继续增长,单机开始吃力,我需要负载均衡。

(下一节:负载均衡)

搜索