故障隔离
场景
新增 API 上线后,用户量增长。
九个月后:
- API 数量:4 个
- 日调用量:80 万次
- 其中新闻 API 占:60%
问题发生
某天下午,我收到了告警:
【告警】新闻 API 响应异常
- 错误率:95%
- 响应时间:超时
- 时间:14:30-15:00我赶紧检查,发现:
# 检查外部 API 状态
response = requests.get('https://news-api.kuaiyizhi.cn/v1/health')
# 连接超时新闻 API 的外部数据源挂了!
问题影响
我查了一下影响范围:
影响分析:
- 新闻 API 调用失败:95% 的错误率
- 其他 API(天气、股票、IP):正常
- 用户体验:部分功能不可用
- 用户投诉:增加了 20%但更严重的是:
连锁反应:
1. 新闻 API 失败
2. 用户重试新闻 API
3. 服务器资源被新闻 API 占用
4. 其他 API 也变慢了
5. 整体用户体验下降根本原因
我查了一下代码:
def handle_news_api(request):
# 调用外部 API(没有超时设置)
response = requests.get(
'https://news-api.kuaiyizhi.cn/v1/news'
)
# 如果外部 API 挂了,这个请求会一直等待
# 直到超时(默认很长)
return jsonify(response.json())问题:
- 没有设置超时时间
- 没有错误处理
- 没有降级方案
- 一个 API 失败影响了其他 API
解决方案
1. 超时控制
def call_external_api(url, params=None, timeout=3):
"""调用外部 API(带超时控制)"""
try:
response = requests.get(
url,
params=params,
timeout=timeout # 设置超时
)
response.raise_for_status()
return response.json()
except requests.Timeout:
raise Exception('External API timeout')
except requests.RequestException as e:
raise Exception(f'External API error: {e}')2. 错误处理
def handle_news_api(request):
"""处理新闻 API 请求(带错误处理)"""
category = request.args.get('category', 'general')
try:
# 调用外部 API
data = call_external_api(
'https://news-api.kuaiyizhi.cn/v1/news',
params={'category': category},
timeout=3
)
return jsonify({
'success': True,
'data': data
})
except Exception as e:
# 记录错误
logging.error(f'News API error: {e}')
# 返回友好的错误信息
return jsonify({
'success': False,
'error': 'News service temporarily unavailable',
'message': 'Please try again later'
}), 503 # Service Unavailable3. 熔断机制
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = {}
self.last_failure_time = {}
self.state = {} # closed, open, half-open
def is_open(self, service_name):
"""检查熔断器是否打开"""
current_time = time.time()
# 检查状态
state = self.state.get(service_name, 'closed')
if state == 'open':
# 检查是否可以尝试恢复
if current_time - self.last_failure_time[service_name] > self.timeout:
self.state[service_name] = 'half-open'
return False
return True
return False
def record_success(self, service_name):
"""记录成功"""
self.failures[service_name] = 0
self.state[service_name] = 'closed'
def record_failure(self, service_name):
"""记录失败"""
self.failures[service_name] = self.failures.get(service_name, 0) + 1
self.last_failure_time[service_name] = time.time()
# 检查是否需要打开熔断器
if self.failures[service_name] >= self.failure_threshold:
self.state[service_name] = 'open'
logging.warning(f'Circuit breaker opened for {service_name}')
# 创建全局熔断器
circuit_breaker = CircuitBreaker()
def handle_news_api(request):
"""处理新闻 API 请求(带熔断)"""
service_name = 'external_news_api'
# 检查熔断器
if circuit_breaker.is_open(service_name):
return jsonify({
'success': False,
'error': 'News service is temporarily unavailable',
'message': 'Service is down, please try again later'
}), 503
try:
# 调用外部 API
data = call_external_api(
'https://news-api.kuaiyizhi.cn/v1/news',
timeout=3
)
# 记录成功
circuit_breaker.record_success(service_name)
return jsonify({
'success': True,
'data': data
})
except Exception as e:
# 记录失败
circuit_breaker.record_failure(service_name)
return jsonify({
'success': False,
'error': 'News service temporarily unavailable'
}), 5034. 降级方案
def handle_news_api(request):
"""处理新闻 API 请求(带降级)"""
service_name = 'external_news_api'
# 检查熔断器
if circuit_breaker.is_open(service_name):
# 降级:返回缓存的热点新闻
cached_news = redis_client.get('news:hot:fallback')
if cached_news:
return jsonify({
'success': True,
'data': json.loads(cached_news),
'source': 'cache',
'message': 'Showing cached news due to service issues'
})
return jsonify({
'success': False,
'error': 'News service temporarily unavailable'
}), 503
try:
# 正常调用
data = call_external_api(
'https://news-api.kuaiyizhi.cn/v1/news',
timeout=3
)
circuit_breaker.record_success(service_name)
# 更新降级缓存
redis_client.setex('news:hot:fallback', 300, json.dumps(data))
return jsonify({
'success': True,
'data': data
})
except Exception as e:
circuit_breaker.record_failure(service_name)
# 降级处理
cached_news = redis_client.get('news:hot:fallback')
if cached_news:
return jsonify({
'success': True,
'data': json.loads(cached_news),
'source': 'cache',
'message': 'Showing cached news due to service issues'
})
return jsonify({
'success': False,
'error': 'News service temporarily unavailable'
}), 503隔离策略
资源隔离
from concurrent.futures import ThreadPoolExecutor
# 为每个外部 API 创建独立的线程池
api_executors = {
'weather': ThreadPoolExecutor(max_workers=10),
'news': ThreadPoolExecutor(max_workers=5),
'stock': ThreadPoolExecutor(max_workers=5),
'ip_query': ThreadPoolExecutor(max_workers=10)
}
def handle_api_with_isolation(api_name, handler):
"""使用隔离的线程池处理 API"""
executor = api_executors.get(api_name)
if not executor:
# 没有配置隔离,使用默认处理
return handler(request)
try:
# 在独立的线程池中执行
future = executor.submit(handler, request)
result = future.result(timeout=5)
return result
except TimeoutError:
return jsonify({
'success': False,
'error': f'{api_name} API timeout'
}), 504
except Exception as e:
return jsonify({
'success': False,
'error': f'{api_name} API error: {str(e)}'
}), 500监控和告警
API 健康检查
def check_api_health(api_name):
"""检查 API 健康状态"""
health_url = f'https://{api_name}-api.kuaiyizhi.cn/health'
try:
response = requests.get(health_url, timeout=3)
return {
'api': api_name,
'status': 'healthy' if response.status_code == 200 else 'unhealthy',
'response_time': response.elapsed.total_seconds()
}
except Exception as e:
return {
'api': api_name,
'status': 'down',
'error': str(e)
}
@app.route('/admin/api-health')
def api_health_check():
"""API 健康检查接口"""
apis = ['weather', 'news', 'stock', 'ip_query']
health_status = {}
for api in apis:
health_status[api] = check_api_health(api)
return jsonify(health_status)效果验证
优化前
新闻 API 故障时:
- 所有 API 响应都变慢
- 服务器资源耗尽
- 用户投诉增加 20%优化后
新闻 API 故障时:
- 新闻 API 快速失败(3 秒超时)
- 熔断器打开,停止调用
- 返回缓存数据或友好错误
- 其他 API 不受影响
- 用户体验基本正常本节小结
✅ 完成的工作:
- 实现了超时控制
- 实现了熔断机制
- 实现了降级方案
- 实现了资源隔离
✅ 效果:
- 单个 API 故障不影响其他 API
- 提升了系统整体可用性
- 用户体验更加稳定
⚠️ 下一步: 不同数据的更新频率不同,我需要设计缓存策略
🎯 下一步: 不同数据更新频率不同,如何设计缓存策略?
