数据分析
场景
数据量增长后,产品部门需要数据分析。
需求:
- 实时监控 API 调用量
- 统计用户增长趋势
- 分析 API 使用情况
- 生成数据报表
问题:
- MySQL 聚合查询太慢
- 数据量太大,统计困难
- 无法实时返回结果解决方案
1. 实时统计系统
class RealTimeStats:
"""实时统计系统"""
def __init__(self, redis_client):
self.redis = redis_client
def record_api_call(self, user_id, endpoint, status):
"""记录 API 调用"""
# 使用 Redis 的 INCR 原子操作
timestamp = int(time.time())
minute_key = f'stats:minute:{timestamp // 60}'
hour_key = f'stats:hour:{timestamp // 3600}'
day_key = f'stats:day:{timestamp // 86400}'
# 管道操作(批量执行)
pipe = self.redis.pipeline()
# 按时间统计
pipe.incr(f'{minute_key}:total')
pipe.incr(f'{hour_key}:total')
pipe.incr(f'{day_key}:total')
# 按 API 统计
pipe.incr(f'{minute_key}:api:{endpoint}')
pipe.incr(f'{hour_key}:api:{endpoint}')
pipe.incr(f'{day_key}:api:{endpoint}')
# 按状态统计
pipe.incr(f'{day_key}:status:{status}')
# 按用户统计
pipe.incr(f'{day_key}:user:{user_id}')
# 设置过期时间
pipe.expire(minute_key, 86400 * 7) # 保留 7 天
pipe.expire(hour_key, 86400 * 30) # 保留 30 天
pipe.expire(day_key, 86400 * 365) # 保留 1 年
pipe.execute()
def get_stats(self, time_range='hour'):
"""获取统计数据"""
now = int(time.time())
if time_range == 'minute':
keys = [
f'stats:minute:{(now // 60) - i}'
for i in range(60) # 最近 60 分钟
]
elif time_range == 'hour':
keys = [
f'stats:hour:{(now // 3600) - i}'
for i in range(24) # 最近 24 小时
]
elif time_range == 'day':
keys = [
f'stats:day:{(now // 86400) - i}'
for i in range(30) # 最近 30 天
]
# 获取数据
pipe = self.redis.pipeline()
for key in keys:
pipe.get(f'{key}:total')
results = pipe.execute()
# 处理数据
stats = []
for i, result in enumerate(results):
if result:
stats.append({
'time': keys[i].split(':')[-1],
'count': int(result)
})
return stats
stats = RealTimeStats(redis_client)
# 在 API 调用时记录
@app.route('/api/weather')
@require_api_key
def get_weather():
# ... 业务逻辑 ...
# 记录统计
stats.record_api_call(
request.user['id'],
'/api/weather',
'success'
)
return jsonify(result)2. 数据分析 API
@app.route('/api/admin/stats/overview')
@require_admin
def get_stats_overview():
"""获取统计概览"""
stats_manager = RealTimeStats(redis_client)
# 当前时间
now = int(time.time())
# 今日数据
today_key = f'stats:day:{now // 86400}'
today_calls = int(redis_client.get(f'{today_key}:total') or 0)
# 本小时数据
hour_key = f'stats:hour:{now // 3600}'
hour_calls = int(redis_client.get(f'{hour_key}:total') or 0)
# 热门 API
hot_apis = []
for api in ['weather', 'news', 'stock', 'ip_query']:
count = int(redis_client.get(f'{today_key}:api:{api}') or 0)
hot_apis.append({'api': api, 'calls': count})
hot_apis.sort(key=lambda x: x['calls'], reverse=True)
return jsonify({
'today': {
'calls': today_calls,
'date': datetime.now().date().isoformat()
},
'current_hour': {
'calls': hour_calls,
'hour': datetime.now().hour
},
'hot_apis': hot_apis[:5]
})
@app.route('/api/admin/stats/trend')
@require_admin
def get_stats_trend():
"""获取趋势数据"""
time_range = request.args.get('range', 'day') # minute, hour, day
stats_manager = RealTimeStats(redis_client)
data = stats_manager.get_stats(time_range)
return jsonify({
'range': time_range,
'data': data
})3. 数据聚合任务
def aggregate_daily_stats():
"""每日数据聚合"""
yesterday = (datetime.now() - timedelta(days=1)).date()
timestamp = int(time.mktime(yesterday.timetuple()))
day_key = f'stats:day:{timestamp // 86400}'
# 获取所有统计
pipe = redis_client.pipeline()
# 总调用量
total_calls = pipe.get(f'{day_key}:total')
# 按 API 统计
api_stats = {}
for api in ['weather', 'news', 'stock', 'ip_query']:
api_stats[api] = pipe.get(f'{day_key}:api:{api}')
# 按状态统计
success_count = pipe.get(f'{day_key}:status:success')
error_count = pipe.get(f'{day_key}:status:error')
pipe.execute()
# 生成报表
report = {
'date': yesterday.isoformat(),
'total_calls': int(total_calls or 0),
'api_breakdown': {
api: int(count or 0)
for api, count in api_stats.items()
},
'success_rate': (
int(success_count or 0) /
(int(success_count or 0) + int(error_count or 0))
if (success_count or error_count) else 0
)
}
# 存储报表
redis_client.setex(
f'report:daily:{yesterday}',
86400 * 365,
json.dumps(report)
)
return report
# 定时执行:每天凌晨 1 点
scheduler.add_job(
aggregate_daily_stats,
CronTrigger(hour=1, minute=0),
id='daily_stats_aggregation'
)效果验证
优化前(MySQL 查询)
统计今日调用量:
- 查询时间:5 秒
- 锁表:影响正常业务
- 无法实时返回优化后(Redis 实时统计)
统计今日调用量:
- 查询时间:<100ms
- 无锁表
- 实时返回本节小结
✅ 完成的工作:
- 实现了实时统计系统
- 实现了数据分析 API
- 实现了数据聚合任务
✅ 效果:
- 查询速度提升 50 倍
- 支持实时数据分析
- 不影响正常业务
⚠️ 下一步:冷热数据分离
🎯 下一步:冷热数据如何分离?
