导航菜单

安全防护

场景

活动结束后,遭遇恶意攻击。

攻击特征:
- 短时间内大量请求
- 来自大量不同 IP
- 请求模式相似
- 目标:耗尽服务器资源

影响:
- 服务器 CPU 100%
- 数据库连接耗尽
- 正常用户无法访问

攻击分析

日志分析:
- 10 分钟内 100 万次请求
- 来自 5000 个不同 IP
- 大部分请求没有有效 API Key
- 请求集中在几个接口

判断:DDoS 攻击

防护策略:多层防护

第 1 层:网络层防护

# 使用 Nginx 限制请求频率

# /etc/nginx/nginx.conf

http {
    # 限制单个 IP 的请求频率
    limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

    # 限制单个 IP 的连接数
    limit_conn_zone $binary_remote_addr zone=conn_limit:10m;

    server {
        location /api/ {
            # 限制请求频率
            limit_req zone=api_limit burst=20 nodelay;

            # 限制连接数
            limit_conn conn_limit 10;

            # 超限返回
            limit_req_status 429;
            limit_conn_status 429;

            proxy_pass http://backend;
        }
    }
}

第 2 层:应用层防护

class SecurityMiddleware:
    """安全中间件"""

    def __init__(self):
        self.blocked_ips = set()
        self.ip_requests = defaultdict(list)

    def check_ip_reputation(self, ip):
        """检查 IP 信誉"""

        # 检查是否在黑名单
        if ip in self.blocked_ips:
            return False

        # 检查请求频率
        recent_requests = self.ip_requests[ip]
        now = time.time()

        # 清理旧记录(1 分钟前)
        self.ip_requests[ip] = [
            t for t in recent_requests if now - t < 60
        ]

        # 检查频率(1 分钟内超过 100 次)
        if len(self.ip_requests[ip]) > 100:
            self.blocked_ips.add(ip)
            send_alert(f'IP {ip} blocked due to high request rate')
            return False

        # 记录这次请求
        self.ip_requests[ip].append(now)

        return True

security = SecurityMiddleware()

@app.before_request
def security_check():
    """安全检查"""

    ip = request.remote_addr

    # 检查 IP 信誉
    if not security.check_ip_reputation(ip):
        return jsonify({'error': 'Too many requests'}), 429

    # 检查 API Key
    api_key = request.headers.get('X-API-Key')
    if not api_key:
        return jsonify({'error': 'API Key required'}), 401

第 3 层:验证码防护

def check_captcha():
    """检查验证码"""

    # 对于可疑请求,要求验证码
    if is_suspicious_request():
        captcha_token = request.headers.get('X-Captcha-Token')

        if not captcha_token:
            return jsonify({
                'error': 'Captcha required',
                'captcha_url': '/captcha'
            }), 403

        # 验证验证码
        if not verify_captcha(captcha_token):
            return jsonify({'error': 'Invalid captcha'}), 403

    return None

def is_suspicious_request():
    """判断是否为可疑请求"""

    # 没有 User-Agent
    if not request.headers.get('User-Agent'):
        return True

    # 请求频率过高
    ip = request.remote_addr
    if len(security.ip_requests[ip]) > 50:
        return True

    return False

第 4 层:CDN 防护

使用 Cloudflare 等 CDN 服务:
- DDoS 防护
- Web 应用防火墙 (WAF)
- 全球加速
- 自动屏蔽攻击 IP

配置:
1. 域名接入 CDN
2. 启用 DDoS 防护
3. 配置防火墙规则
4. 启用缓存

监控和告警

实时监控

def monitor_security():
    """安全监控"""

    # 检查系统指标
    metrics = {
        'qps': get_current_qps(),
        'cpu': psutil.cpu_percent(),
        'memory': psutil.virtual_memory().percent,
        'connections': get_active_connections(),
        'blocked_ips': len(security.blocked_ips)
    }

    # 异常检测
    if metrics['qps'] > 1000:
        send_alert(f'High QPS detected: {metrics["qps"]}')

    if metrics['cpu'] > 90:
        send_alert(f'High CPU usage: {metrics["cpu"]}%')

    if len(metrics['blocked_ips']) > 100:
        send_alert(f'Many IPs blocked: {len(metrics["blocked_ips"])}')

    return metrics

# 实时监控(每 10 秒)
scheduler.add_job(
    monitor_security,
    'interval',
    seconds=10,
    id='security_monitor'
)

应急响应

自动防护策略

class AutoDefense:
    """自动防护系统"""

    def __init__(self):
        self.protection_level = 0  # 0=正常,1=低防护,2=高防护

    def adjust_protection(self):
        """根据威胁自动调整防护级别"""

        threat_score = self.calculate_threat_score()

        if threat_score > 80:
            # 高威胁:启用所有防护
            self.enable_high_protection()
        elif threat_score > 50:
            # 中等威胁:启用部分防护
            self.enable_medium_protection()
        else:
            # 低威胁:正常模式
            self.disable_protection()

    def enable_high_protection(self):
        """启用高级防护"""

        # 启用严格限流
        set_rate_limit(requests_per_minute=10)

        # 启用验证码
        enable_captcha_for_all()

        # 启用 CDN 防护模式
        enable_cdn_under_attack_mode()

        send_alert('High protection mode enabled')

auto_defense = AutoDefense()

效果验证

攻击前

正常访问:
- QPS: 10
- 响应时间:50ms
- 用户体验:良好

攻击时(无防护)

遭受攻击:
- QPS: 1000
- 响应时间:超时
- 用户体验:系统不可用

攻击时(有防护)

多层防护:
- Nginx 层:拦截 90%
- 应用层:拦截 8%
- CDN 层:拦截 1.9%
- 最终到达后端:0.1%

效果:
- 正常用户仍可访问
- 系统保持稳定
- 攻击被有效拦截

本节小结

✅ 完成的工作:

  • 实现了多层防护策略
  • 实现了 IP 信誉检查
  • 实现了自动防护调整
  • 集成了 CDN 防护

✅ 效果:

  • 有效防御 DDoS 攻击
  • 正常用户不受影响
  • 系统保持可用

🎯 完成!我已经学会了处理海量流量

搜索