用户认证
设计目标
我决定实现用户认证系统,让每个用户有独立的调用配额。
核心需求:
- 用户可以注册账号
- 系统分配唯一的 API Key
- 每次调用必须验证 API Key
- 限制每个用户的调用配额
这是一个完整的认证体系,我需要从零开始设计。
方案设计
API Key 的设计
API Key 是什么?
- 一个唯一的字符串
- 用来标识用户身份
- 需要在每次调用时携带
API Key 的格式:
ak_live_xxxxxxxxxxxxxxx (正式环境)
ak_test_xxxxxxxxxxxxxxx (测试环境)格式说明:
ak_:前缀,表示 API Keylive/test:环境标识- 后面:随机字符串
我特意加了环境标识,这样开发和生产环境可以分开管理。
数据模型设计
用户表 (users)
CREATE TABLE users (
id INT PRIMARY KEY AUTO_INCREMENT,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
api_key VARCHAR(64) UNIQUE NOT NULL,
plan VARCHAR(50) DEFAULT 'free', -- free, basic, pro
daily_limit INT DEFAULT 1000, -- 每日调用限额
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_api_key (api_key)
);调用记录表 (api_logs)
CREATE TABLE api_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
endpoint VARCHAR(255) NOT NULL,
params TEXT,
response_time INT,
status VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_created (user_id, created_at),
INDEX idx_created (created_at)
);这两个表的设计,我考虑了很久。用户表要存储核心信息,日志表要记录每一次调用。
API Key 生成
生成安全的 API Key
import secrets
import string
def generate_api_key():
"""生成安全的 API Key"""
# 生成 32 字节的随机数据
random_bytes = secrets.token_bytes(32)
# 转换为十六进制字符串
api_key = 'ak_live_' + random_bytes.hex()
return api_key
# 示例输出
# ak_live_a1b2c3d4e5f6... (64 字符)为什么使用 secrets 模块?
- 加密安全的随机数生成
- 不可预测
- 防止碰撞
我当时想,API Key 是用户的身份标识,必须在源头上保证安全。
用户注册流程
注册接口
@app.route('/api/auth/register', methods=['POST'])
def register():
data = request.get_json()
email = data.get('email')
password = data.get('password')
# 验证输入
if not email or not password:
return jsonify({'error': 'Email and password required'}), 400
# 检查邮箱是否已注册
if db.execute('SELECT id FROM users WHERE email = ?', (email,)):
return jsonify({'error': 'Email already registered'}), 400
# 生成 API Key
api_key = generate_api_key()
# 密码哈希
password_hash = hashlib.sha256(password.encode()).hexdigest()
# 保存到数据库
user_id = db.execute(
'INSERT INTO users (email, password_hash, api_key) VALUES (?, ?, ?)',
(email, password_hash, api_key)
)
return jsonify({
'message': 'Registration successful',
'user_id': user_id,
'api_key': api_key ⚠️ 只在注册时返回一次
}), 201登录接口
@app.route('/api/auth/login', methods=['POST'])
def login():
data = request.get_json()
email = data.get('email')
password = data.get('password')
# 查询用户
user = db.execute(
'SELECT * FROM users WHERE email = ?',
(email,)
)
if not user:
return jsonify({'error': 'Invalid credentials'}), 401
# 验证密码
password_hash = hashlib.sha256(password.encode()).hexdigest()
if user['password_hash'] != password_hash:
return jsonify({'error': 'Invalid credentials'}), 401
return jsonify({
'message': 'Login successful',
'api_key': user['api_key']
})我特意让 API Key 只在注册时返回一次,之后就再也看不到了。这样设计是为了安全,但也带来了一些麻烦——用户如果弄丢了 API Key,就只能重置。
API 调用认证
修改天气 API 接口
from functools import wraps
def require_api_key(f):
"""API Key 验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
# 获取 API Key
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API Key required'}), 401
# 验证 API Key
user = db.execute(
'SELECT * FROM users WHERE api_key = ?',
(api_key,)
)
if not user:
return jsonify({'error': 'Invalid API Key'}), 401
# 将用户信息添加到 request
request.user = user
return f(*args, **kwargs)
return decorated_function
@app.route('/api/weather')
@require_api_key
def get_weather():
city = request.args.get('city')
# 检查今日配额
today = datetime.now().date()
usage = db.execute(
'''SELECT COUNT(*) as count FROM api_logs
WHERE user_id = ? AND DATE(created_at) = ?''',
(request.user['id'], today)
)['count']
if usage >= request.user['daily_limit']:
return jsonify({'error': 'Daily limit exceeded'}), 429
# ... 正常的业务逻辑
# 记录调用日志
db.execute(
'''INSERT INTO api_logs (user_id, endpoint, params)
VALUES (?, ?, ?)''',
(request.user['id'], '/api/weather', f'city={city}')
)
return jsonify(result)用装饰器的方式,我可以把认证逻辑从业务代码中分离出来。这是我从之前写代码的经验中学到的——关注点分离。
客户端使用方式
正确的调用方式
import requests
# 设置 API Key
api_key = 'ak_live_a1b2c3d4e5f6...'
# 调用 API
response = requests.get(
'https://kuaiyizhi.cn/api/weather?city=北京',
headers={'X-API-Key': api_key}
)
print(response.json())错误的调用方式
# ❌ 缺少 API Key
response = requests.get('https://kuaiyizhi.cn/api/weather?city=北京')
# 返回:{"error": "API Key required"}
# ❌ 无效的 API Key
response = requests.get(
'https://kuaiyizhi.cn/api/weather?city=北京',
headers={'X-API-Key': 'invalid_key'}
)
# 返回:{"error": "Invalid API Key"}
# ❌ 超出配额
# 返回:{"error": "Daily limit exceeded"}安全考虑
API Key 的安全问题
问题 1:API Key 泄露
- 如果 API Key 泄露,他人可以冒用
- 配额被消耗
- 产生费用
解决方案:
- 提醒用户妥善保管 API Key
- 提供重置 API Key 的功能
- 设置 IP 白名单(可选)
- 监控异常调用
密码存储
问题 2:密码不能明文存储
# ❌ 错误:明文存储
password_hash = password
# ✅ 正确:哈希存储
password_hash = hashlib.sha256(password.encode()).hexdigest()更好的方案是使用 bcrypt:
import bcrypt
# 哈希密码
password_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
# 验证密码
if bcrypt.checkpw(password.encode(), stored_hash):
# 密码正确我当时想,安全这件事,再怎么重视也不为过。
效果验证
上线后,我观察了一周的数据:
滥用情况
| 指标 | 之前 | 现在 |
|---|---|---|
| 异常 IP 调用 | 每天 10+ 个 | 0 个 ✅ |
| 外部 API 滥用 | 频繁 | 0 次 ✅ |
| 正常用户体验 | 差 | 优秀 ✅ |
用户增长
| 指标 | 数值 |
|---|---|
| 注册用户 | 200 个 |
| 日活用户 | 80 个 |
| 平均每用户调用 | 100 次/天 |
看到这些数据,我知道这条路走对了。
新的问题
虽然解决了滥用问题,但带来了一些新挑战:
问题 1:数据存储
用户数据和调用日志越来越多:
- 用户表:200 条
- 日志表:每天 5 万条
- 增长速度:每月 150 万条
应该用什么数据库?
问题 2:查询性能
日志表查询越来越慢:
# 统计用户的今日用量
db.execute(
'''SELECT COUNT(*) FROM api_logs
WHERE user_id = ? AND DATE(created_at) = ?'''
)随着数据量增长,这个查询会越来越慢。
如何优化?
问题 3:数据备份
如果数据库挂了,所有用户数据都会丢失。
如何备份数据?
一个问题解决了,新的问题又来了。这就是做系统的常态吧。
本节小结
✅ 完成的工作:
- 实现了用户注册和登录
- 设计了 API Key 机制
- 添加了调用配额限制
- 解决了 API 滥用问题
✅ 效果:
- 完全阻止了滥用行为
- 提升了正常用户体验
- 可以追踪每个用户的调用情况
⚠️ 新的问题:
- 数据存储方案需要优化
- 查询性能需要提升
- 数据安全需要保障
下一节,我要解决数据库的问题。
