数据归档策略
随着业务增长,数据库表越来越大。为了防止无限膨胀,我们需要设计数据归档策略。
为什么要归档?
数据增长趋势:
第 1 个月:10 万条 → 查询正常
第 3 个月:500 万条 → 查询变慢
第 6 个月:2000 万条 → 备份困难
第 12 个月:5000 万条 → 需要改变归档的好处
| 好处 | 说明 |
|---|---|
| 控制主表大小 | 保持热数据表在合理范围 |
| 提升查询性能 | 减少扫描数据量 |
| 降低备份成本 | 备份时间窗口缩短 |
| 合规要求 | 满足数据保留政策 |
1. 数据分层策略
┌─────────────────────────────────────────────────┐
│ 数据分层架构 │
├─────────────────────────────────────────────────┤
│ │
│ 热数据层(Hot) │
│ ├─ MySQL 主库 │
│ ├─ 最近 30 天数据 │
│ └─ 频繁访问,需要索引优化 │
│ │
│ 温数据层(Warm) │
│ ├─ MySQL 从库 / 归档表 │
│ ├─ 30-90 天数据 │
│ └─ 偶尔访问,查询走从库 │
│ │
│ 冷数据层(Cold) │
│ ├─ 对象存储 / CSV 文件 │
│ ├─ 90 天以上数据 │
│ └─ 极少访问,按需查询 │
│ │
└─────────────────────────────────────────────────┘2. 归档方案实现
方案 A:归档到文件
import csv
from datetime import datetime, timedelta
def archive_old_logs(days: int = 30):
"""归档 30 天前的日志到 CSV 文件"""
cutoff_date = datetime.now() - timedelta(days=days)
month_str = cutoff_date.strftime('%Y%m')
with get_db_cursor() as cursor:
# 导出数据
cursor.execute(
'''SELECT * FROM api_logs
WHERE created_at < %s
ORDER BY created_at''',
(cutoff_date,)
)
# 写入 CSV 文件
archive_file = f'/data/archive/api_logs_{month_str}.csv.gz'
with gzip.open(archive_file, 'wt', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'id', 'user_id', 'endpoint', 'params',
'response_time', 'status', 'created_at'
])
writer.writerows(cursor.fetchall())
# 删除已归档的数据
cursor.execute(
'DELETE FROM api_logs WHERE created_at < %s',
(cutoff_date,)
)
print(f'归档完成:{cursor.rowcount} 条记录')方案 B:归档到历史表
def create_partition_table(month: str):
"""创建月度分区表"""
table_name = f'api_logs_{month}'
with get_db_cursor() as cursor:
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
id BIGINT PRIMARY KEY,
user_id INT NOT NULL,
endpoint VARCHAR(255) NOT NULL,
params TEXT,
response_time INT,
status VARCHAR(50),
created_at TIMESTAMP,
INDEX idx_user_created (user_id, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
def migrate_to_archive(cutoff_date: datetime):
"""迁移旧数据到归档表"""
month = cutoff_date.strftime('%Y%m')
create_partition_table(month)
with get_db_cursor() as cursor:
# 移动到归档表
cursor.execute(f'''
INSERT INTO api_logs_{month}
SELECT * FROM api_logs
WHERE created_at < %s
''', (cutoff_date,))
# 从主表删除
cursor.execute(f'''
DELETE FROM api_logs
WHERE created_at < %s
''', (cutoff_date,))
print(f'迁移完成:{cursor.rowcount} 条记录')方案 C:使用分区表(MySQL 8.0+)
-- 创建分区表
CREATE TABLE api_logs (
id BIGINT NOT NULL,
user_id INT NOT NULL,
endpoint VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL,
PRIMARY KEY (id, created_at)
)
PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) (
PARTITION p202501 VALUES LESS THAN (202502),
PARTITION p202502 VALUES LESS THAN (202503),
PARTITION p202503 VALUES LESS THAN (202504),
-- 持续添加新分区
PARTITION pmax VALUES LESS THAN MAXVALUE
);
-- 删除旧分区(自动清理)
ALTER TABLE api_logs DROP PARTITION p202501;3. 定时归档任务
使用 Cron
# /etc/crontab
# 每天凌晨 2 点执行归档
0 2 * * * /usr/bin/python3 /app/scripts/archive_logs.pyPython 脚本
#!/usr/bin/env python3
# scripts/archive_logs.py
import sys
sys.path.insert(0, '/app')
from database import get_db_cursor
from archive import archive_old_logs
if __name__ == '__main__':
try:
# 归档 30 天前的数据
archive_old_logs(days=30)
print('归档任务完成')
except Exception as e:
print(f'归档任务失败:{e}', file=sys.stderr)
sys.exit(1)使用 Celery 定时任务
# tasks.py
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='redis://localhost/0')
# 配置定时任务
app.conf.beat_schedule = {
'archive-logs-daily': {
'task': 'tasks.archive_old_logs',
'schedule': crontab(hour=2, minute=0), # 每天凌晨 2 点
'kwargs': {'days': 30}
}
}
@app.task
def archive_old_logs(days: int):
# 归档逻辑
pass4. 数据保留政策
保留策略配置
# config.py
DATA_RETENTION = {
'api_logs': {
'hot_days': 30, # 热数据保留 30 天
'warm_days': 90, # 温数据保留 90 天
'cold_days': 365, # 冷数据保留 1 年
'forever': False # 不永久保留
},
'user_sessions': {
'hot_days': 7,
'warm_days': 30,
'forever': False
},
'audit_logs': {
'hot_days': 90,
'warm_days': 365,
'cold_days': 365 * 3, # 审计日志保留 3 年(合规要求)
'forever': False
}
}自动化清理
def cleanup_expired_data():
"""清理过期的归档数据"""
for table_name, policy in DATA_RETENTION.items():
if not policy.get('forever'):
max_days = policy.get('cold_days', 365)
cutoff = datetime.now() - timedelta(days=max_days)
with get_db_cursor() as cursor:
cursor.execute(
f'DELETE FROM {table_name} WHERE created_at < %s',
(cutoff,)
)
print(f'{table_name}: 清理 {cursor.rowcount} 条记录')5. 归档监控
监控指标
def get_table_stats():
"""获取表统计信息"""
with get_db_cursor() as cursor:
cursor.execute('''
SELECT
table_name,
table_rows,
ROUND(data_length / 1024 / 1024, 2) as data_mb,
ROUND(index_length / 1024 / 1024, 2) as index_mb
FROM information_schema.tables
WHERE table_schema = 'api_platform'
ORDER BY data_length DESC
''')
return cursor.fetchall()告警配置
def check_archive_alerts():
"""检查归档状态并发送告警"""
stats = get_table_stats()
for row in stats:
if row['data_mb'] > 1000: # 超过 1GB
send_alert(
f"表 {row['table_name']} 数据量过大:"
f"{row['data_mb']}MB, {row['table_rows']}行"
)本节小结
✅ 归档方案对比:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 文件归档 | 成本低,易管理 | 查询不便 | 合规审计 |
| 历史表 | 查询方便 | 管理复杂 | 经常查询的旧数据 |
| 分区表 | 自动管理,性能好 | 需要 MySQL 8.0+ | 大规模数据 |
✅ 最佳实践:
- 热数据控制在 30 天内
- 归档任务在低峰期执行
- 归档前后备份数据
- 监控表大小和归档状态
🎯 下一步:
了解数据库事务的 ACID 特性,保证数据一致性。
