容灾备份
场景
最坏的情况发生了:整个机房断电。
事件:
- 数据中心电力故障
- UPS 只能维持 30 分钟
- 发电机启动失败
- 整个机房离线
影响:
- 北京地域完全不可用
- 用户请求失败
- 数据可能丢失解决方案:多活容灾
1. 多地域架构
之前架构:
北京(主) → 上海(从)
↓
广州(从)
问题:北京故障,无法写入
多活架构:
北京(主写入)
↓ ⇅
上海(主写入)
↓ ⇅
广州(主写入)
每个地域都可以:
- 处理读写请求
- 同步数据到其他地域
- 独立运行2. 数据同步策略
class MultiMasterReplication:
"""多主复制管理"""
def __init__(self):
self.regions = ['beijing', 'shanghai', 'guangzhou']
self.local_region = get_local_region()
def write_with_replication(self, table, data):
"""写入并复制到其他地域"""
# 生成全局唯一 ID
data['id'] = self.generate_global_id()
data['region'] = self.local_region
data['timestamp'] = time.time()
# 写入本地数据库
self.write_to_local_db(table, data)
# 异步复制到其他地域
self.replicate_to_other_regions(table, data)
def generate_global_id(self):
"""生成全局唯一 ID"""
# 使用雪花算法
timestamp = int(time.time() * 1000)
region_id = self.regions.index(self.local_region)
sequence = self.get_sequence()
return (timestamp << 22) | (region_id << 12) | sequence
def replicate_to_other_regions(self, table, data):
"""异步复制到其他地域"""
for region in self.regions:
if region == self.local_region:
continue
# 发布到消息队列
message_queue.publish({
'type': 'replication',
'target_region': region,
'table': table,
'data': data
})
def consume_replications(self):
"""消费复制消息(写入其他地域)"""
while True:
try:
message = message_queue.consume(timeout=1)
if message and message['type'] == 'replication':
if message['target_region'] == self.local_region:
# 写入本地数据库
self.write_to_local_db(
message['table'],
message['data']
)
except Exception as e:
logging.error(f'Replication error: {e}')
time.sleep(1)
multi_master = MultiMasterReplication()3. 冲突解决策略
class ConflictResolver:
"""冲突解决器"""
def resolve_conflict(self, record1, record2):
"""解决冲突记录"""
# 策略 1:时间戳优先
if record1['timestamp'] > record2['timestamp']:
return record1
else:
return record2
# 策略 2:地域优先级
region_priority = {
'beijing': 3,
'shanghai': 2,
'guangzhou': 1
}
if region_priority[record1['region']] > region_priority[record2['region']]:
return record1
else:
return record2
# 策略 3:业务逻辑
return self.resolve_by_business_logic(record1, record2)
def resolve_by_business_logic(self, record1, record2):
"""根据业务逻辑解决冲突"""
# 例如:用户资料,保留最后更新的字段
resolved = {}
for field in record1.keys():
if field == 'updated_at':
# 取最新的更新时间
resolved[field] = max(record1[field], record2[field])
elif field in ['name', 'email']:
# 重要字段,保留时间戳新的
if record1['timestamp'] > record2['timestamp']:
resolved[field] = record1[field]
else:
resolved[field] = record2[field]
else:
# 其他字段,合并非空值
resolved[field] = record1[field] or record2[field]
return resolved
conflict_resolver = ConflictResolver()4. DNS 故障转移
class DNSFailover:
"""DNS 故障转移"""
def __init__(self):
self.health_status = {}
self.dns_provider = DNSProvider()
def check_region_health(self):
"""检查各地域健康状态"""
for region in ['beijing', 'shanghai', 'guangzhou']:
try:
response = requests.get(
f'https://{region}.kuaiyizhi.cn/health',
timeout=5
)
self.health_status[region] = (
response.status_code == 200
)
except Exception:
self.health_status[region] = False
def update_dns_records(self):
"""更新 DNS 记录"""
healthy_regions = [
r for r, healthy in self.health_status.items()
if healthy
]
if not healthy_regions:
send_alert('All regions are down!')
return
# 更新 DNS 记录(轮询到健康地域)
self.dns_provider.update_records(
name='api.kuaiyizhi.cn',
records=[
{'type': 'A', 'value': f'{r}.kuaiyizhi.cn'}
for r in healthy_regions
]
)
# 定时检查和更新
def monitor_and_update_dns():
"""监控并更新 DNS"""
dns_failover = DNSFailover()
while True:
dns_failover.check_region_health()
dns_failover.update_dns_records()
time.sleep(60)5. 数据备份策略
class BackupManager:
"""备份管理器"""
def backup_to_external_storage(self):
"""备份到外部存储"""
# 1. 数据库备份
self.backup_database()
# 2. Redis 备份
self.backup_redis()
# 3. 配置文件备份
self.backup_configs()
def backup_database(self):
"""备份数据库到 S3"""
# 使用 mysqldump
dump_file = f'/tmp/mysql_backup_{int(time.time())}.sql'
subprocess.run([
'mysqldump',
'--all-databases',
'--single-transaction',
'--master-data=2',
f'--result-file={dump_file}'
])
# 压缩
compressed_file = dump_file + '.gz'
subprocess.run(['gzip', dump_file])
# 上传到 S3
s3_client.upload_file(
compressed_file,
'api-backups',
f'mysql/{os.path.basename(compressed_file)}'
)
# 删除本地文件
os.remove(compressed_file)
def backup_redis(self):
"""备份 Redis 到 S3"""
# 触发 Redis 后台保存
redis_client.save()
# 等待保存完成
while redis_client.lastsave() < time.time() - 3600:
time.sleep(1)
# 复制 RDB 文件
rdb_file = '/var/lib/redis/dump.rdb'
backup_file = f'/tmp/redis_backup_{int(time.time())}.rdb'
shutil.copy2(rdb_file, backup_file)
# 上传到 S3
s3_client.upload_file(
backup_file,
'api-backups',
f'redis/{os.path.basename(backup_file)}'
)
os.remove(backup_file)
# 定时备份
def scheduled_backup():
"""定时备份"""
backup_manager = BackupManager()
backup_manager.backup_to_external_storage()
logging.info('Backup completed')
# 每天凌晨 3 点备份
scheduler.add_job(
scheduled_backup,
CronTrigger(hour=3, minute=0),
id='daily_backup'
)灾难演练
def disaster_recovery_drill():
"""灾难恢复演练"""
logging.info('Starting disaster recovery drill')
# 1. 模拟北京地域故障
simulate_region_failure('beijing')
# 2. 验证 DNS 故障转移
health = check_api_health()
assert health['status'] == 'healthy', 'DNS failover failed'
# 3. 验证其他地域正常工作
for region in ['shanghai', 'guangzhou']:
response = requests.get(
f'https://{region}.kuaiyizhi.cn/health'
)
assert response.status_code == 200, f'{region} is down'
# 4. 恢复北京地域
restore_region('beijing')
logging.info('Disaster recovery drill completed successfully')效果验证
优化前
机房故障:
- 北京地域完全不可用
- 无法处理新请求
- 无法写入数据
- 持续时间:数小时优化后
机房故障:
- DNS 自动切换到其他地域
- 其他地域继续服务
- 数据最终一致
- 用户基本无感知本节小结
✅ 完成的工作:
- 实现了多主架构
- 实现了冲突解决
- 实现了 DNS 故障转移
- 实现了数据备份
✅ 效果:
- 机房故障不影响整体
- 实现了真正的多活
- 数据安全有保障
🎯 完成!我已经学会了构建高可用系统
