导航菜单

数据同步

场景

多地域部署后,遇到新问题。

用户反馈:
"为什么我在北京刚注册,上海还查不到我的账号?"

技术分析:
- 数据同步有延迟
- 主从复制不及时
- 跨地域网络不稳定

问题分析

数据流向:

北京(主库) → 上海(从库)

     └→ 广州(从库)

问题:
1. 主从延迟:几秒到几分钟
2. 网络不稳定:偶发中断
3. 数据一致性:用户期望实时

解决方案

1. 优化主从复制

# MySQL 主库配置

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_cache_size = 4M
max_binlog_cache_size = 512M

# 并发复制
binlog_group_commit = 10

# 半同步复制
plugin_load = "rpl_semi_sync_master=semisync_master.so"
rpl_semi_sync_master_enabled = 1
rpl_semi_sync_master_timeout = 1000  # 1 秒超时

# 从库配置
[mysqld]
server-id = 2  # 每个从库不同
relay-log = mysql-relay-bin
read-only = 1

# 并行复制
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 4

# 半同步从库
plugin_load = "rpl_semi_sync_slave=semisync_slave.so"
rpl_semi_sync_slave_enabled = 1

2. 应用层路由优化

class DatabaseRouter:
    """智能数据库路由"""

    def __init__(self):
        self.master_db = connect_to('beijing-master')
        self.slave_dbs = {
            'beijing': connect_to('beijing-slave'),
            'shanghai': connect_to('shanghai-slave'),
            'guangzhou': connect_to('guangzhou-slave')
        }
        self.last_write_time = 0
        self.replication_lag_threshold = 2  # 2 秒

    def get_connection(self, read_only=False):
        """获取数据库连接"""

        if not read_only:
            # 写操作:使用主库
            self.last_write_time = time.time()
            return self.master_db

        # 读操作:优先使用本地从库
        local_region = get_user_region()
        local_slave = self.slave_dbs[local_region]

        # 检查是否刚写过(避免读到旧数据)
        if time.time() - self.last_write_time < self.replication_lag_threshold:
            return self.master_db

        # 检查从库延迟
        lag = self.check_replication_lag(local_slave)
        if lag > self.replication_lag_threshold:
            # 延迟过高,使用主库
            return self.master_db

        return local_slave

    def check_replication_lag(self, slave):
        """检查从库延迟"""

        try:
            cursor = slave.cursor()
            cursor.execute('SHOW SLAVE STATUS')
            status = cursor.fetchone()

            lag = status['Seconds_Behind_Master']
            return lag if lag is not None else 0

        except Exception:
            return float('inf')

db_router = DatabaseRouter()

# 使用示例
def get_user(user_id):
    """读取用户(使用从库)"""
    conn = db_router.get_connection(read_only=True)
    cursor = conn.cursor()
    cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))
    return cursor.fetchone()

def update_user(user_id, data):
    """更新用户(使用主库)"""
    conn = db_router.get_connection(read_only=False)
    cursor = conn.cursor()
    cursor.execute('UPDATE users SET ... WHERE id = %s', (user_id,))
    conn.commit()

3. 最终一致性方案

class EventualConsistencyManager:
    """最终一致性管理器"""

    def __init__(self):
        self.pending_updates = {}
        self.redis = redis.Redis()

    def write_with_eventual_consistency(self, user_id, data):
        """写入数据(最终一致)"""

        # 1. 写入主库
        update_user_in_master(user_id, data)

        # 2. 记录待同步更新
        update_id = generate_update_id()
        self.pending_updates[update_id] = {
            'user_id': user_id,
            'data': data,
            'timestamp': time.time()
        }

        # 3. 发布更新事件
        self.redis.publish('data_updates', json.dumps({
            'update_id': update_id,
            'user_id': user_id,
            'data': data
        }))

        # 4. 设置短期缓存
        cache_key = f'user:{user_id}'
        self.redis.setex(cache_key, 60, json.dumps(data))

    def subscribe_to_updates(self):
        """订阅数据更新"""

        pubsub = self.redis.pubsub()
        pubsub.subscribe('data_updates')

        for message in pubsub.listen():
            if message['type'] == 'message':
                update = json.loads(message['data'])
                self.sync_to_local_slave(update)

    def sync_to_local_slave(self, update):
        """同步到本地从库"""

        local_slave = db_router.slave_dbs[get_local_region()]

        # 更新本地从库
        update_user_in_slave(
            local_slave,
            update['user_id'],
            update['data']
        )

        # 清理待更新记录
        if update['update_id'] in self.pending_updates:
            del self.pending_updates[update['update_id']]

consistency_manager = EventualConsistencyManager()

# 启动同步订阅
Thread(target=consistency_manager.subscribe_to_updates, daemon=True).start()

监控和告警

主从同步监控

def monitor_replication():
    """监控主从同步状态"""

    for region, slave in db_router.slave_dbs.items():
        try:
            # 检查从库状态
            cursor = slave.cursor()
            cursor.execute('SHOW SLAVE STATUS')
            status = cursor.fetchone()

            lag = status['Seconds_Behind_Master']
            slave_status = status['Slave_IO_Running']
            sql_status = status['Slave_SQL_Running']

            # 检查异常
            if lag is None or lag > 10:
                send_alert(
                    f'Replication lag in {region}: {lag}s'
                )

            if slave_status != 'Yes':
                send_alert(
                    f'Slave IO stopped in {region}'
                )

            if sql_status != 'Yes':
                send_alert(
                    f'Slave SQL stopped in {region}'
                )

        except Exception as e:
            send_alert(
                f'Failed to check replication in {region}: {e}'
            )

# 定时监控
scheduler.add_job(
    monitor_replication,
    'interval',
    seconds=60,
    id='monitor_replication'
)

效果验证

优化前

主从延迟:
- 平均延迟:5 秒
- 最大延迟:30 秒
- 影响用户体验

优化后

主从延迟:
- 平均延迟:&lt;1 秒
- 最大延迟:3 秒
- 用户体验良好

措施:
- 半同步复制
- 并行复制
- 智能路由
- 最终一致性方案

本节小结

✅ 完成的工作:

  • 优化了主从复制配置
  • 实现了智能数据库路由
  • 实现了最终一致性方案
  • 添加了同步监控

✅ 效果:

  • 主从延迟降低 80%
  • 数据一致性提升
  • 用户体验改善

⚠️ 下一步:用户访问哪里的服务器?

🎯 下一步:如何实现就近路由?

搜索