导航菜单

数据归档策略

随着业务增长,数据库表越来越大。为了防止无限膨胀,我们需要设计数据归档策略。

为什么要归档?

数据增长趋势:

第 1 个月:10 万条    →  查询正常
第 3 个月:500 万条  →  查询变慢
第 6 个月:2000 万条 →  备份困难
第 12 个月:5000 万条 →  需要改变

归档的好处

好处说明
控制主表大小保持热数据表在合理范围
提升查询性能减少扫描数据量
降低备份成本备份时间窗口缩短
合规要求满足数据保留政策

1. 数据分层策略

┌─────────────────────────────────────────────────┐
│               数据分层架构                       │
├─────────────────────────────────────────────────┤
│                                                 │
│  热数据层(Hot)                                 │
│  ├─ MySQL 主库                                  │
│  ├─ 最近 30 天数据                               │
│  └─ 频繁访问,需要索引优化                       │
│                                                 │
│  温数据层(Warm)                                │
│  ├─ MySQL 从库 / 归档表                          │
│  ├─ 30-90 天数据                                │
│  └─ 偶尔访问,查询走从库                         │
│                                                 │
│  冷数据层(Cold)                                │
│  ├─ 对象存储 / CSV 文件                          │
│  ├─ 90 天以上数据                               │
│  └─ 极少访问,按需查询                           │
│                                                 │
└─────────────────────────────────────────────────┘

2. 归档方案实现

方案 A:归档到文件

import csv
from datetime import datetime, timedelta

def archive_old_logs(days: int = 30):
    """归档 30 天前的日志到 CSV 文件"""
    cutoff_date = datetime.now() - timedelta(days=days)
    month_str = cutoff_date.strftime('%Y%m')

    with get_db_cursor() as cursor:
        # 导出数据
        cursor.execute(
            '''SELECT * FROM api_logs
               WHERE created_at < %s
               ORDER BY created_at''',
            (cutoff_date,)
        )

        # 写入 CSV 文件
        archive_file = f'/data/archive/api_logs_{month_str}.csv.gz'
        with gzip.open(archive_file, 'wt', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([
                'id', 'user_id', 'endpoint', 'params',
                'response_time', 'status', 'created_at'
            ])
            writer.writerows(cursor.fetchall())

        # 删除已归档的数据
        cursor.execute(
            'DELETE FROM api_logs WHERE created_at < %s',
            (cutoff_date,)
        )
        print(f'归档完成:{cursor.rowcount} 条记录')

方案 B:归档到历史表

def create_partition_table(month: str):
    """创建月度分区表"""
    table_name = f'api_logs_{month}'
    with get_db_cursor() as cursor:
        cursor.execute(f'''
            CREATE TABLE IF NOT EXISTS {table_name} (
                id BIGINT PRIMARY KEY,
                user_id INT NOT NULL,
                endpoint VARCHAR(255) NOT NULL,
                params TEXT,
                response_time INT,
                status VARCHAR(50),
                created_at TIMESTAMP,
                INDEX idx_user_created (user_id, created_at)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        ''')

def migrate_to_archive(cutoff_date: datetime):
    """迁移旧数据到归档表"""
    month = cutoff_date.strftime('%Y%m')
    create_partition_table(month)

    with get_db_cursor() as cursor:
        # 移动到归档表
        cursor.execute(f'''
            INSERT INTO api_logs_{month}
            SELECT * FROM api_logs
            WHERE created_at < %s
        ''', (cutoff_date,))

        # 从主表删除
        cursor.execute(f'''
            DELETE FROM api_logs
            WHERE created_at < %s
        ''', (cutoff_date,))

        print(f'迁移完成:{cursor.rowcount} 条记录')

方案 C:使用分区表(MySQL 8.0+)

-- 创建分区表
CREATE TABLE api_logs (
    id BIGINT NOT NULL,
    user_id INT NOT NULL,
    endpoint VARCHAR(255) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    PRIMARY KEY (id, created_at)
)
PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) (
    PARTITION p202501 VALUES LESS THAN (202502),
    PARTITION p202502 VALUES LESS THAN (202503),
    PARTITION p202503 VALUES LESS THAN (202504),
    -- 持续添加新分区
    PARTITION pmax VALUES LESS THAN MAXVALUE
);

-- 删除旧分区(自动清理)
ALTER TABLE api_logs DROP PARTITION p202501;

3. 定时归档任务

使用 Cron

# /etc/crontab
# 每天凌晨 2 点执行归档
0 2 * * * /usr/bin/python3 /app/scripts/archive_logs.py

Python 脚本

#!/usr/bin/env python3
# scripts/archive_logs.py

import sys
sys.path.insert(0, '/app')

from database import get_db_cursor
from archive import archive_old_logs

if __name__ == '__main__':
    try:
        # 归档 30 天前的数据
        archive_old_logs(days=30)
        print('归档任务完成')
    except Exception as e:
        print(f'归档任务失败:{e}', file=sys.stderr)
        sys.exit(1)

使用 Celery 定时任务

# tasks.py
from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='redis://localhost/0')

# 配置定时任务
app.conf.beat_schedule = {
    'archive-logs-daily': {
        'task': 'tasks.archive_old_logs',
        'schedule': crontab(hour=2, minute=0),  # 每天凌晨 2 点
        'kwargs': {'days': 30}
    }
}

@app.task
def archive_old_logs(days: int):
    # 归档逻辑
    pass

4. 数据保留政策

保留策略配置

# config.py

DATA_RETENTION = {
    'api_logs': {
        'hot_days': 30,      # 热数据保留 30 天
        'warm_days': 90,     # 温数据保留 90 天
        'cold_days': 365,    # 冷数据保留 1 年
        'forever': False     # 不永久保留
    },
    'user_sessions': {
        'hot_days': 7,
        'warm_days': 30,
        'forever': False
    },
    'audit_logs': {
        'hot_days': 90,
        'warm_days': 365,
        'cold_days': 365 * 3,  # 审计日志保留 3 年(合规要求)
        'forever': False
    }
}

自动化清理

def cleanup_expired_data():
    """清理过期的归档数据"""
    for table_name, policy in DATA_RETENTION.items():
        if not policy.get('forever'):
            max_days = policy.get('cold_days', 365)
            cutoff = datetime.now() - timedelta(days=max_days)

            with get_db_cursor() as cursor:
                cursor.execute(
                    f'DELETE FROM {table_name} WHERE created_at < %s',
                    (cutoff,)
                )
                print(f'{table_name}: 清理 {cursor.rowcount} 条记录')

5. 归档监控

监控指标

def get_table_stats():
    """获取表统计信息"""
    with get_db_cursor() as cursor:
        cursor.execute('''
            SELECT
                table_name,
                table_rows,
                ROUND(data_length / 1024 / 1024, 2) as data_mb,
                ROUND(index_length / 1024 / 1024, 2) as index_mb
            FROM information_schema.tables
            WHERE table_schema = 'api_platform'
            ORDER BY data_length DESC
        ''')
        return cursor.fetchall()

告警配置

def check_archive_alerts():
    """检查归档状态并发送告警"""
    stats = get_table_stats()
    for row in stats:
        if row['data_mb'] > 1000:  # 超过 1GB
            send_alert(
                f"表 {row['table_name']} 数据量过大:"
                f"{row['data_mb']}MB, {row['table_rows']}行"
            )

本节小结

✅ 归档方案对比:

方案优点缺点适用场景
文件归档成本低,易管理查询不便合规审计
历史表查询方便管理复杂经常查询的旧数据
分区表自动管理,性能好需要 MySQL 8.0+大规模数据

✅ 最佳实践:

  • 热数据控制在 30 天内
  • 归档任务在低峰期执行
  • 归档前后备份数据
  • 监控表大小和归档状态

🎯 下一步

了解数据库事务的 ACID 特性,保证数据一致性。

搜索