导航菜单

海量日志

场景

两年半后,数据量激增。

数据规模:
- 日均调用量:200 万次
- 日志表记录:每天 200 万条
- 累计数据:18 个月 × 30 天 × 200 万 = 10 亿条

问题:
- 查询越来越慢
- 数据库存储告急
- 备份时间过长

解决方案

1. 分库分表

-- 按时间分表
CREATE TABLE api_logs_2024_01 (
  LIKE api_logs INCLUDING ALL
) PARTITION BY RANGE (created_at);

-- 自动创建分区
ALTER TABLE api_logs
PARTITION BY RANGE (TO_DAYS(created_at)) (
  PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
  PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
  -- ...
);

2. 冷热分离

def archive_old_logs():
    """归档旧日志到冷存储"""

    # 6 个月前的数据
    cutoff_date = datetime.now() - timedelta(days=180)

    with get_db_connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            'SELECT * FROM api_logs WHERE created_at < %s',
            (cutoff_date,)
        )

        # 写入 S3(使用 Parquet 格式)
        for batch in iter_batch(cursor, batch_size=10000):
            s3_client.put_object(
                Bucket='api-logs-archive',
                Key=f'logs/{batch[0]["created_at"].date()}.parquet',
                Body=to_parquet(batch)
            )

        # 删除已归档的数据
        cursor.execute(
            'DELETE FROM api_logs WHERE created_at < %s',
            (cutoff_date,)
        )

3. 使用 Elasticsearch

from elasticsearch import Elasticsearch

es = Elasticsearch(['http://elasticsearch:9200'])

def index_log(log_entry):
    """索引日志到 Elasticsearch"""

    es.index(
        index='api-logs',
        body={
            'timestamp': log_entry['created_at'],
            'user_id': log_entry['user_id'],
            'endpoint': log_entry['endpoint'],
            'response_time': log_entry['response_time'],
            'status': log_entry['status']
        }
    )

def search_logs(user_id, start_date, end_date):
    """搜索日志"""

    query = {
        'query': {
            'bool': {
                'must': [
                    {'term': {'user_id': user_id}},
                    {'range': {'timestamp': {
                        'gte': start_date,
                        'lte': end_date
                    }}}
                ]
            }
        }
    }

    results = es.search(index='api-logs', body=query)
    return results['hits']['hits']

效果验证

优化前

日志查询(用户本月调用量):
- 扫描:1 亿条记录
- 时间:15 秒
- 影响:用户体验差

优化后

Elasticsearch 查询:
- 扫描:仅相关记录
- 时间:0.5 秒
- 提升:30 倍

本节小结

✅ 完成的工作:

  • 实现了数据分区
  • 实现了冷热分离
  • 引入了 Elasticsearch

✅ 效果:

  • 查询性能提升 30 倍
  • 存储成本降低

🎯 下一步:产品经理要数据分析,如何实时统计?

搜索