定时更新
场景
缓存策略上线后,发现新问题:
用户反馈:
- "为什么你们的股票数据是 5 分钟前的?"
- "新闻更新太慢了"
技术分析:
- 被动缓存:用户访问时才更新
- 热点数据:应该主动更新
- 定时任务:需要任务调度系统解决方案
引入任务调度系统,主动更新缓存。
任务调度器选择
方案对比
| 方案 | 优势 | 劣势 |
|---|---|---|
| cron | 简单、系统自带 | 不支持分布式 |
| Celery | 功能强大、分布式 | 复杂、依赖多 |
| APScheduler | 轻量、易用 | 单机 |
| 云函数 | 免运维 | 成本高、有延迟 |
选择 APScheduler(轻量、满足需求)
APScheduler 实现
安装
pip install apscheduler基础配置
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
# 创建调度器
scheduler = BackgroundScheduler()
# 添加任务
def update_stock_data():
"""更新股票数据"""
# 获取热门股票
hot_stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
for symbol in hot_stocks:
try:
data = call_external_api(
f'https://stock-api.kuaiyizhi.cn/quote',
params={'symbol': symbol}
)
cache_key = f'stock:{symbol}'
redis_cache.set(cache_key, data, ttl=300)
logging.info(f'Updated stock data for {symbol}')
except Exception as e:
logging.error(f'Failed to update {symbol}: {e}')
# 添加定时任务:每 5 分钟更新一次
scheduler.add_job(
update_stock_data,
CronTrigger(minute='*/5'),
id='update_stocks',
name='Update Stock Data'
)
# 启动调度器
scheduler.start()任务类型
1. 固定频率任务
# 每 10 秒执行一次
scheduler.add_job(
health_check,
'interval',
seconds=10,
id='health_check'
)
def health_check():
"""健康检查"""
# 检查各个组件的健康状态
pass2. Cron 表达式任务
# 每天凌晨 2 点执行
scheduler.add_job(
daily_report,
CronTrigger(hour=2, minute=0),
id='daily_report'
)
# 每周一上午 9 点执行
scheduler.add_job(
weekly_report,
CronTrigger(day_of_week='mon', hour=9, minute=0),
id='weekly_report'
)
# 每月 1 号凌晨 3 点执行
scheduler.add_job(
monthly_billing,
CronTrigger(day=1, hour=3, minute=0),
id='monthly_billing'
)3. 一次性任务
# 5 分钟后执行一次
scheduler.add_job(
one_time_task,
'date',
run_date=datetime.now() + timedelta(minutes=5),
id='one_time'
)任务管理 API
查看任务列表
@app.route('/admin/scheduled-jobs')
def list_scheduled_jobs():
"""查看所有定时任务"""
jobs = scheduler.get_jobs()
result = []
for job in jobs:
result.append({
'id': job.id,
'name': job.name,
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
'trigger': str(job.trigger)
})
return jsonify(result)手动触发任务
@app.route('/admin/jobs/<job_id>/trigger', methods=['POST'])
def trigger_job(job_id):
"""手动触发任务"""
try:
job = scheduler.get_job(job_id)
if job:
job.modify(next_run_time=datetime.now())
return jsonify({'message': f'Job {job_id} triggered'})
return jsonify({'error': 'Job not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500高级任务
热点数据更新
def update_hot_data():
"""更新所有热点数据"""
# 热门城市天气(每小时)
hot_cities = ['北京', '上海', '深圳', '广州', '杭州']
for city in hot_cities:
update_weather_cache(city)
# 热门股票(每 5 分钟)
hot_stocks = ['AAPL', 'GOOGL', 'MSFT']
for symbol in hot_stocks:
update_stock_cache(symbol)
# 热点新闻(每 10 分钟)
hot_categories = ['general', 'technology', 'business']
for category in hot_categories:
update_news_cache(category)
scheduler.add_job(
update_hot_data,
CronTrigger(minute='*/5'),
id='update_hot_data'
)缓存预热
def warmup_cache():
"""缓存预热"""
logging.info('Starting cache warmup...')
# 预加载明天可能查询的数据
tomorrow = (datetime.now() + timedelta(days=1)).date()
# 预加载天气预报
for city in get_all_cities():
try:
forecast = get_weather_forecast(city, tomorrow)
cache_key = f'forecast:{city}:{tomorrow}'
redis_cache.set(cache_key, forecast, ttl=86400)
except Exception as e:
logging.error(f'Failed to warmup {city}: {e}')
logging.info('Cache warmup completed')
# 每天晚上 11 点执行
scheduler.add_job(
warmup_cache,
CronTrigger(hour=23, minute=0),
id='cache_warmup'
)数据清理
def cleanup_old_data():
"""清理旧数据"""
# 清理 90 天前的日志
cutoff_date = datetime.now() - timedelta(days=90)
with get_db_connection() as conn:
cursor = conn.cursor()
deleted = cursor.execute(
'DELETE FROM api_logs WHERE created_at < ?',
(cutoff_date,)
)
conn.commit()
logging.info(f'Deleted {deleted} old log entries')
# 每周日凌晨 3 点执行
scheduler.add_job(
cleanup_old_data,
CronTrigger(day_of_week='sun', hour=3, minute=0),
id='cleanup_logs'
)任务监控
任务执行日志
class JobLogger:
"""任务执行日志"""
def __init__(self):
self.redis = redis_client
def log_job_start(self, job_id):
"""记录任务开始"""
key = f'job_log:{job_id}:{datetime.now().isoformat()}'
self.redis.setex(f'{key}:start', 1, 86400)
def log_job_end(self, job_id, success=True, error=None):
"""记录任务结束"""
key = f'job_log:{job_id}:{datetime.now().isoformat()}'
self.redis.setex(f'{key}:end', 1, 86400)
if error:
self.redis.setex(f'{key}:error', error, 86400)
def get_job_logs(self, job_id, limit=10):
"""获取任务日志"""
pattern = f'job_log:{job_id}:*'
keys = self.redis.keys(pattern)
logs = []
for key in keys[-limit:]:
logs.append({
'time': key.split(':')[-1],
'start': self.redis.get(f'{key}:start'),
'end': self.redis.get(f'{key}:end'),
'error': self.redis.get(f'{key}:error')
})
return logs
job_logger = JobLogger()
# 装饰器:自动记录日志
def log_job(job_id):
def decorator(func):
def wrapper(*args, **kwargs):
job_logger.log_job_start(job_id)
try:
result = func(*args, **kwargs)
job_logger.log_job_end(job_id, success=True)
return result
except Exception as e:
job_logger.log_job_end(job_id, success=False, error=str(e))
raise
return wrapper
return decorator
# 使用
@log_job('update_stocks')
def update_stock_data():
# ... 任务逻辑
pass失败重试
from apscheduler.executors.pool import ThreadPoolExecutor
# 配置执行器(支持重试)
executors = {
'default': ThreadPoolExecutor(max_workers=20)
}
job_defaults = {
'coalesce': True, # 合并错过的任务
'max_instances': 3, # 最多 3 个实例
'misfire_grace_time': 60 # 错过任务的宽限时间
}
scheduler = BackgroundScheduler(
executors=executors,
job_defaults=job_defaults
)
# 带重试的任务
def update_with_retry(job_id, max_retries=3):
"""带重试的任务"""
for attempt in range(max_retries):
try:
# 执行任务
result = fetch_and_cache_data()
# 成功则返回
logging.info(f'{job_id} succeeded on attempt {attempt + 1}')
return result
except Exception as e:
logging.warning(f'{job_id} failed on attempt {attempt + 1}: {e}')
if attempt == max_retries - 1:
# 最后一次尝试也失败了
logging.error(f'{job_id} failed after {max_retries} attempts')
raise
# 等待后重试
time.sleep(2 ** attempt) # 指数退避本节小结
✅ 完成的工作:
- 引入了 APScheduler 任务调度
- 实现了多种定时任务
- 添加了任务监控和日志
- 实现了失败重试机制
✅ 效果:
- 热点数据始终保持新鲜
- 用户体验提升
- 减少了用户等待时间
⚠️ 新挑战: 用户数突破百万,流量压力巨大
🎯 下一步: 用户数突破百万,需要处理海量请求
