导航菜单

迁移到 MySQL 实战

经过分析,你决定采用 MySQL + Redis 的混合架构。本章详细介绍如何迁移到 MySQL。

1. 安装 MySQL

macOS 安装

# 使用 Homebrew 安装
brew install mysql

# 启动 MySQL
mysql.server start

# 设置 root 密码
mysql_secure_installation

Ubuntu/Debian 安装

sudo apt update
sudo apt install mysql-server
sudo systemctl start mysql
sudo systemctl enable mysql
sudo mysql_secure_installation

Docker 安装(推荐开发环境)

docker run --name mysql-db \
  -e MYSQL_ROOT_PASSWORD=your_password \
  -e MYSQL_DATABASE=api_platform \
  -p 3306:3306 \
  -d mysql:8.0

2. 创建数据库和表

创建数据库

CREATE DATABASE api_platform
  CHARACTER SET utf8mb4
  COLLATE utf8mb4_unicode_ci;

USE api_platform;

创建用户表

CREATE TABLE users (
  id INT PRIMARY KEY AUTO_INCREMENT,
  email VARCHAR(255) UNIQUE NOT NULL,
  password_hash VARCHAR(255) NOT NULL,
  api_key VARCHAR(64) UNIQUE NOT NULL,
  plan VARCHAR(50) DEFAULT 'free',
  daily_limit INT DEFAULT 1000,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  INDEX idx_api_key (api_key),
  INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

创建 API 日志表

CREATE TABLE api_logs (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  endpoint VARCHAR(255) NOT NULL,
  params TEXT,
  response_time INT,
  status VARCHAR(50),
  ip_address VARCHAR(45),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  INDEX idx_user_created (user_id, created_at),
  INDEX idx_created (created_at),
  INDEX idx_endpoint (endpoint),
  FOREIGN KEY (user_id) REFERENCES users(id)
    ON DELETE CASCADE
    ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

3. 迁移数据

从 SQLite 导出数据

import sqlite3
import csv

sqlite_conn = sqlite3.connect('sqlite_database.db')
sqlite_cursor = sqlite_conn.cursor()

# 导出 users 表
sqlite_cursor.execute('SELECT * FROM users')
with open('users.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['id', 'email', 'password_hash', 'api_key', 'plan', 'daily_limit'])
    writer.writerows(sqlite_cursor.fetchall())

sqlite_conn.close()

导入到 MySQL

import pymysql

mysql_conn = pymysql.connect(
    host='localhost',
    user='root',
    password='your_password',
    database='api_platform'
)
mysql_cursor = mysql_conn.cursor()

with open('users.csv', 'r') as f:
    reader = csv.reader(f)
    next(reader)
    for row in reader:
        mysql_cursor.execute(
            'INSERT INTO users (id, email, password_hash, api_key, plan, daily_limit) VALUES (%s, %s, %s, %s, %s, %s)',
            row
        )

mysql_conn.commit()
mysql_conn.close()

4. 修改代码

安装依赖

pip install pymysql
pip install dbutils  # 连接池

使用连接池

from dbutils.pooled_db import PooledDB
import pymysql

db_pool = PooledDB(
    creator=pymysql,
    host='localhost',
    user='root',
    password='your_password',
    database='api_platform',
    maxconnections=20,
    mincached=5,
    maxcached=10,
    blocking=True,
    autocommit=True,
    cursorclass=pymysql.cursors.DictCursor,
    charset='utf8mb4'
)

def get_db_connection():
    return db_pool.connection()

上下文管理器(最佳实践)

from contextlib import contextmanager

@contextmanager
def get_db_cursor():
    conn = get_db_connection()
    cursor = conn.cursor()
    try:
        yield cursor
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

# 使用示例
with get_db_cursor() as cursor:
    cursor.execute('SELECT * FROM users WHERE id = %s', (1,))
    user = cursor.fetchone()

5. 封装数据库操作

# repositories/user_repository.py

class UserRepository:
    def __init__(self, db_pool):
        self.db_pool = db_pool

    def get_by_id(self, user_id: int) -> dict:
        with self.db_pool.connection() as conn:
            cursor = conn.cursor()
            cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))
            return cursor.fetchone()

    def get_by_api_key(self, api_key: str) -> dict:
        with self.db_pool.connection() as conn:
            cursor = conn.cursor()
            cursor.execute('SELECT * FROM users WHERE api_key = %s', (api_key,))
            return cursor.fetchone()

    def create(self, email: str, password_hash: str, api_key: str) -> int:
        with self.db_pool.connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                'INSERT INTO users (email, password_hash, api_key) VALUES (%s, %s, %s)',
                (email, password_hash, api_key)
            )
            conn.commit()
            return cursor.lastrowid

本节小结

✅ 完成的工作:

  • 安装和配置 MySQL
  • 设计数据库表结构
  • 从 SQLite 迁移数据
  • 使用连接池提升性能

✅ 关键配置:

  • 字符集:utf8mb4(支持 emoji)
  • 引擎:InnoDB(支持事务)
  • 连接池:20 个最大连接
  • 索引:覆盖常用查询场景

🎯 下一步

MySQL 迁移完成后,我们引入 Redis 缓存热点数据,进一步提升性能。

搜索