导航菜单

冷热分离

场景

运行两年后,数据量巨大。

数据规模:
- 总日志记录:10 亿条
- 每天新增:200 万条
- 数据库大小:500GB

问题:
- 查询越来越慢
- 存储成本高
- 备份时间长

解决方案

1. 冷热数据分离策略

数据分类:

热数据(近期):
- 最近 30 天的日志
- 频繁查询
- 存储在 MySQL 主库
- 需要快速访问

温数据(中期):
- 30-180 天的日志
- 偶尔查询
- 存储在 MySQL 从库
- 访问速度要求一般

冷数据(长期):
- 180 天前的日志
- 很少查询
- 归档到对象存储
- 成本优化

2. 分区表设计

-- 按时间分区
CREATE TABLE api_logs (
  id BIGINT NOT NULL AUTO_INCREMENT,
  user_id INT NOT NULL,
  endpoint VARCHAR(255) NOT NULL,
  params TEXT,
  response_time INT,
  status VARCHAR(50),
  created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (id, created_at),
  INDEX idx_user_created (user_id, created_at),
  INDEX idx_created (created_at)
) PARTITION BY RANGE (TO_DAYS(created_at)) (
  PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
  PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
  PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
  -- ...
  PARTITION pmax VALUES LESS THAN MAXVALUE
);

-- 自动创建分区(存储过程)
DELIMITER //
CREATE PROCEDURE create_monthly_partition()
BEGIN
  DECLARE next_month DATE;
  DECLARE partition_name VARCHAR(20);
  DECLARE partition_value VARCHAR(50);

  SET next_month = DATE_ADD(CURDATE(), INTERVAL 2 MONTH);
  SET next_month = DATE_FORMAT(next_month, '%Y-%m-01');
  SET partition_name = CONCAT('p', DATE_FORMAT(next_month, '%Y%m'));
  SET partition_value = CONCAT('TO_DAYS(\'', next_month, '\')');

  SET @sql = CONCAT(
    'ALTER TABLE api_logs ',
    'REORGANIZE PARTITION pmax INTO (',
    'PARTITION ', partition_name, ' VALUES LESS THAN (', partition_value, '),',
    'PARTITION pmax VALUES LESS THAN MAXVALUE)'
  );

  PREPARE stmt FROM @sql;
  EXECUTE stmt;
  DEALLOCATE PREPARE stmt;
END //
DELIMITER ;

-- 定时执行:每月 1 号
CREATE EVENT auto_create_partition
ON SCHEDULE EVERY 1 MONTH
STARTS '2024-01-01 00:00:00'
DO CALL create_monthly_partition();

3. 数据归档

def archive_old_logs():
    """归档旧日志到对象存储"""

    # 归档 6 个月前的数据
    cutoff_date = datetime.now() - timedelta(days=180)

    logging.info(f'Starting archival for data before {cutoff_date}')

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取需要归档的分区
        cursor.execute(
            '''SELECT PARTITION_NAME
               FROM INFORMATION_SCHEMA.PARTITIONS
               WHERE TABLE_SCHEMA = 'api_platform'
               AND TABLE_NAME = 'api_logs'
               AND PARTITION_NAME != 'pmax'
               ORDER BY PARTITION_NAME'''
        )

        partitions = cursor.fetchall()

        for partition in partitions:
            partition_name = partition['PARTITION_NAME']

            # 提取分区日期
            partition_date = datetime.strptime(partition_name[1:], '%Y%m')

            if partition_date < cutoff_date:
                logging.info(f'Archiving partition {partition_name}')

                # 导出数据到 S3
                archive_partition_to_s3(partition_name)

                # 删除分区
                cursor.execute(
                    f'ALTER TABLE api_logs DROP PARTITION {partition_name}'
                )

                conn.commit()

                logging.info(f'Archived and dropped partition {partition_name}')

def archive_partition_to_s3(partition_name):
    """将分区数据导出到 S3"""

    # 使用 SELECT INTO OUTFILE 导出
    export_file = f'/tmp/{partition_name}.csv'

    with get_db_connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            f'''SELECT * FROM api_logs PARTITION ({partition_name})
               INTO OUTFILE '{export_file}'
               FIELDS TERMINATED BY ','
               ENCLOSED BY '"'
               LINES TERMINATED BY '\n'
            '''
        )

    # 上传到 S3
    s3_client.upload_file(
        export_file,
        'api-logs-archive',
        f'logs/{partition_name}.csv.gz'
    )

    # 删除临时文件
    os.remove(export_file)

4. 冷数据查询

@app.route('/api/admin/logs/archive/<date>')
@require_admin
def get_archived_logs(date):
    """查询归档日志"""

    # 从 S3 获取数据
    try:
        # 解析日期
        archive_date = datetime.strptime(date, '%Y-%m')
        partition_name = f'p{archive_date.strftime("%Y%m")}'

        # 从 S3 下载
        response = s3_client.get_object(
            Bucket='api-logs-archive',
            Key=f'logs/{partition_name}.csv.gz'
        )

        # 解压并解析
        content = gzip.decompress(response['Body'].read())
        lines = content.decode('utf-8').split('\n')

        # 返回前 100 条
        results = []
        reader = csv.DictReader(lines)
        for i, row in enumerate(reader):
            if i >= 100:
                break
            results.append(row)

        return jsonify({
            'archive_date': date,
            'total_count': len(lines) - 1,
            'sample': results
        })

    except Exception as e:
        return jsonify({
            'error': f'Failed to retrieve archived logs: {str(e)}'
        }), 404

效果验证

优化前

单表存储:
- 总记录:10 亿条
- 查询时间:10-30 秒
- 存储成本:高
- 备份时间:数小时

优化后(冷热分离)

分区 + 归档:
- 热数据:600 万条(30 天)
- 查询时间:&lt;1 秒
- 存储成本:降低 70%
- 备份时间:30 分钟

本节小结

✅ 完成的工作:

  • 实现了分区表
  • 实现了数据归档
  • 实现了冷数据查询

✅ 效果:

  • 查询性能提升
  • 存储成本降低 70%
  • 维护效率提升

🎯 完成!我已经学会了处理大数据挑战

搜索