导航菜单

调用量统计

场景

用户认证系统上线后,业务继续增长。

三个月后:

  • 注册用户:1000 个
  • 日活用户:300 个
  • 日调用量:20 万次

新需求

运营团队找到我:

我们需要了解:
1. 每个用户用了多少次 API?
2. 哪些时间段的调用最多?
3. 哪些 API 接口最热门?
4. 谁是我们的重度用户?

这些数据对后续推出付费套餐很重要。

我意识到:需要建立完善的统计系统

当前问题

我查看了一下当前的日志系统:

-- 调用日志表
CREATE TABLE api_logs (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  endpoint VARCHAR(255) NOT NULL,
  params TEXT,
  response_time INT,
  status VARCHAR(50),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

问题 1:查询太慢

# 统计某个用户的本月调用量
def get_user_monthly_usage(user_id):
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute(
            '''SELECT COUNT(*) FROM api_logs
               WHERE user_id = ?
               AND created_at >= DATE_FORMAT(NOW(), '%Y-%m-01')''',
            (user_id,)
        )
        result = cursor.fetchone()
        return result['COUNT(*)']

这个查询需要扫描几十万条记录,耗时 2-5 秒

问题 2:统计维度不够

当前只能按用户统计,但运营需要:

  • 按时间统计(每小时、每天)
  • 按接口统计(哪个接口最热门)
  • 按响应时间统计(性能分析)

问题 3:数据量太大

日志表每个月增加 600 万条记录:

  • 3 个月:1800 万条
  • 查询越来越慢
  • 数据库压力越来越大

解决思路

我决定采用预聚合的方式:

核心思想

不实时统计原始日志,而是:

  1. 实时更新统计数据
  2. 定期聚合原始日志
  3. 查询时直接读取统计数据

优势

  • 查询速度极快(直接读取统计结果)
  • 原始日志可以定期清理
  • 支持多种统计维度

设计统计表

按小时统计表

CREATE TABLE api_stats_hourly (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  endpoint VARCHAR(255) NOT NULL,
  hour TIMESTAMP NOT NULL,  -- 统计时间(精确到小时)
  total_calls INT DEFAULT 0,      -- 总调用次数
  success_calls INT DEFAULT 0,    -- 成功次数
  error_calls INT DEFAULT 0,      -- 失败次数
  avg_response_time FLOAT,        -- 平均响应时间
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  UNIQUE KEY unique_stats (user_id, endpoint, hour),
  INDEX idx_user_hour (user_id, hour),
  INDEX idx_hour (hour)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

按日统计表

CREATE TABLE api_stats_daily (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  endpoint VARCHAR(255) NOT NULL,
  date DATE NOT NULL,  -- 统计日期
  total_calls INT DEFAULT 0,
  success_calls INT DEFAULT 0,
  error_calls INT DEFAULT 0,
  avg_response_time FLOAT,
  p95_response_time FLOAT,  -- 95 分位响应时间
  p99_response_time FLOAT,  -- 99 分位响应时间
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  UNIQUE KEY unique_stats (user_id, endpoint, date),
  INDEX idx_user_date (user_id, date),
  INDEX idx_date (date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

用户汇总表

CREATE TABLE user_stats_monthly (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  month DATE NOT NULL,  -- 统计月份(每月 1 号)
  total_calls INT DEFAULT 0,
  total_success INT DEFAULT 0,
  total_error INT DEFAULT 0,
  avg_daily_calls FLOAT,
  peak_hour_calls INT,  -- 峰值小时调用量
  most_used_endpoint VARCHAR(255),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  UNIQUE KEY unique_stats (user_id, month),
  INDEX idx_user_month (user_id, month)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

实时统计更新

修改 API 调用逻辑

@app.route('/api/weather')
@require_api_key
def get_weather():
    start_time = time.time()

    try:
        # ... 业务逻辑
        result = fetch_weather_data(city)
        status = 'success'
    except Exception as e:
        result = {'error': str(e)}
        status = 'error'

    response_time = int((time.time() - start_time) * 1000)

    # 记录原始日志(用于详细分析)
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute(
            '''INSERT INTO api_logs
               (user_id, endpoint, params, response_time, status)
               VALUES (?, ?, ?, ?, ?)''',
            (request.user['id'], '/api/weather',
             f'city={city}', response_time, status)
        )
        conn.commit()

    # 实时更新统计数据
    update_hourly_stats(
        request.user['id'],
        '/api/weather',
        response_time,
        status
    )

    return jsonify(result)

更新小时统计

def update_hourly_stats(user_id, endpoint, response_time, status):
    """实时更新小时统计"""

    current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 使用 INSERT ... ON DUPLICATE KEY UPDATE
        cursor.execute(
            '''INSERT INTO api_stats_hourly
               (user_id, endpoint, hour, total_calls, success_calls, error_calls, avg_response_time)
               VALUES (?, ?, ?, 1, ?, ?, ?)
               ON DUPLICATE KEY UPDATE
               total_calls = total_calls + 1,
               success_calls = success_calls + %s,
               error_calls = error_calls + %s,
               avg_response_time = (avg_response_time * (total_calls - 1) + %s) / total_calls''',
            (
                user_id, endpoint, current_hour,
                1 if status == 'success' else 0,
                0 if status == 'success' else 1,
                response_time,
                1 if status == 'success' else 0,
                0 if status == 'success' else 1,
                response_time
            )
        )
        conn.commit()

定时聚合任务

每日聚合

def aggregate_daily_stats():
    """每日聚合小时统计数据"""

    yesterday = (datetime.now() - timedelta(days=1)).date()

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取昨天所有用户的数据
        cursor.execute(
            '''SELECT user_id, endpoint,
               SUM(total_calls) as total_calls,
               SUM(success_calls) as success_calls,
               SUM(error_calls) as error_calls,
               AVG(avg_response_time) as avg_response_time
               FROM api_stats_hourly
               WHERE DATE(hour) = %s
               GROUP BY user_id, endpoint''',
            (yesterday,)
        )

        stats = cursor.fetchall()

        # 插入日统计表
        for stat in stats:
            cursor.execute(
                '''INSERT INTO api_stats_daily
                   (user_id, endpoint, date, total_calls, success_calls,
                    error_calls, avg_response_time)
                   VALUES (%s, %s, %s, %s, %s, %s, %s)
                   ON DUPLICATE KEY UPDATE
                   total_calls = VALUES(total_calls),
                   success_calls = VALUES(success_calls),
                   error_calls = VALUES(error_calls),
                   avg_response_time = VALUES(avg_response_time)''',
                (stat['user_id'], stat['endpoint'], yesterday,
                 stat['total_calls'], stat['success_calls'],
                 stat['error_calls'], stat['avg_response_time'])
            )

        conn.commit()

# 使用 cron 定时执行:每天凌晨 1 点
# 0 1 * * * /usr/bin/python3 /path/to/aggregate_daily.py

每月聚合

def aggregate_monthly_stats():
    """每月聚合日统计数据"""

    last_month = (datetime.now().replace(day=1) - timedelta(days=1)).date()
    last_month_day = last_month.replace(day=1)

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取上月所有用户的数据
        cursor.execute(
            '''SELECT user_id,
               SUM(total_calls) as total_calls,
               SUM(success_calls) as success_calls,
               SUM(error_calls) as error_calls,
               AVG(total_calls) as avg_daily_calls,
               MAX(peak_hour_calls) as peak_calls,
               (SELECT endpoint FROM api_stats_daily
                WHERE user_id = t.user_id
                AND date >= %s
                GROUP BY endpoint
                ORDER BY SUM(total_calls) DESC
                LIMIT 1) as most_used_endpoint
               FROM api_stats_daily t
               WHERE date >= %s AND date < %s
               GROUP BY user_id''',
            (last_month_day, last_month_day, datetime.now().replace(day=1))
        )

        stats = cursor.fetchall()

        for stat in stats:
            cursor.execute(
                '''INSERT INTO user_stats_monthly
                   (user_id, month, total_calls, total_success, total_error,
                    avg_daily_calls, peak_hour_calls, most_used_endpoint)
                   VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                   ON DUPLICATE KEY UPDATE
                   total_calls = VALUES(total_calls),
                   total_success = VALUES(total_success),
                   total_error = VALUES(total_error)''',
                (stat['user_id'], last_month_day,
                 stat['total_calls'], stat['success_calls'],
                 stat['error_calls'], stat['avg_daily_calls'],
                 stat['peak_calls'], stat['most_used_endpoint'])
            )

        conn.commit()

# 使用 cron 定时执行:每月 1 号凌晨 2 点
# 0 2 1 * * /usr/bin/python3 /path/to/aggregate_monthly.py

查询接口

用户本月调用量

@app.route('/api/stats/my-usage')
@require_api_key
def get_my_usage():
    """获取用户本月调用量"""

    this_month = datetime.now().replace(day=1).date()

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 从月统计表查询(超快!)
        cursor.execute(
            'SELECT * FROM user_stats_monthly WHERE user_id = ? AND month = ?',
            (request.user['id'], this_month)
        )
        stats = cursor.fetchone()

        if not stats:
            # 如果没有统计数据,返回 0
            return jsonify({
                'month': this_month.isoformat(),
                'total_calls': 0,
                'avg_daily_calls': 0
            })

        return jsonify({
            'month': stats['month'].isoformat(),
            'total_calls': stats['total_calls'],
            'total_success': stats['total_success'],
            'total_error': stats['total_error'],
            'avg_daily_calls': stats['avg_daily_calls'],
            'peak_hour_calls': stats['peak_hour_calls'],
            'most_used_endpoint': stats['most_used_endpoint']
        })

每日调用趋势

@app.route('/api/stats/daily-trend')
@require_api_key
def get_daily_trend():
    """获取每日调用趋势"""

    days = request.args.get('days', 30, type=int)
    start_date = (datetime.now() - timedelta(days=days)).date()

    with get_db_connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            '''SELECT date, SUM(total_calls) as total_calls
               FROM api_stats_daily
               WHERE user_id = ? AND date >= ?
               GROUP BY date
               ORDER BY date ASC''',
            (request.user['id'], start_date)
        )

        results = cursor.fetchall()

        return jsonify({
            'data': [
                {
                    'date': r['date'].isoformat(),
                    'calls': r['total_calls']
                }
                for r in results
            ]
        })

性能对比

优化前(直接查询日志表)

# 查询用户本月调用量
cursor.execute(
    '''SELECT COUNT(*) FROM api_logs
       WHERE user_id = ?
       AND created_at >= DATE_FORMAT(NOW(), '%Y-%m-01')'''
)

# 执行时间:2-5 秒
# 需要扫描:几十万条记录

优化后(查询统计表)

# 查询用户本月调用量
cursor.execute(
    'SELECT * FROM user_stats_monthly WHERE user_id = ? AND month = ?'
)

# 执行时间:10-20 毫秒
# 直接读取:1 条记录

性能提升:100-500 倍!

数据清理

有了统计表后,原始日志可以定期清理:

def clean_old_logs():
    """清理 90 天前的原始日志"""

    cutoff_date = datetime.now() - timedelta(days=90)

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 先归档(可选)
        cursor.execute(
            '''SELECT * FROM api_logs
               WHERE created_at < %s
               INTO OUTFILE '/archive/logs_%s.csv'
               FIELDS TERMINATED BY ','
               ENCLOSED BY '"'
               LINES TERMINATED BY '\n' ''',
            (cutoff_date, cutoff_date.strftime('%Y%m%d'))
        )

        # 删除旧日志
        cursor.execute(
            'DELETE FROM api_logs WHERE created_at < %s',
            (cutoff_date,)
        )

        conn.commit()

        print(f"Deleted {cursor.rowcount} old log entries")

# 定时执行:每周日凌晨 3 点
# 0 3 * * 0 /usr/bin/python3 /path/to/clean_logs.py

本节小结

✅ 完成的工作:

  • 设计了多维度统计表(小时、日、月)
  • 实现了实时统计更新
  • 实现了定时聚合任务
  • 查询性能提升 100-500 倍

✅ 效果:

  • 用户可以实时查看自己的调用量
  • 运营可以分析数据趋势
  • 为付费套餐提供数据支持

⚠️ 新的问题:

  • 某些用户调用过于频繁
  • 需要限制每个用户的调用速率

🎯 下一步: 某些用户每秒调用几百次 API,我需要限流。

(下一节:限流机制)

搜索