导航菜单

超额计费

场景

账单系统上线后,运行正常。

但运营团队发现了一些问题:

用户反馈:
1. 为什么我的账单有超额费用?
2. 超额费用是怎么计算的?
3. 我什么时候超的限?

技术问题:
1. 统计数据延迟(月度统计不是实时的)
2. 无法精确计算超额时间点
3. 用户无法实时查看已用量

问题分析

我查了一下当前的计费逻辑:

# 当前计费方式(简化版)
total_calls = get_monthly_stats(user_id, month)
plan_limit = subscription['daily_limit'] * 30
overage = max(0, total_calls - plan_limit)
overage_fee = overage * 0.001

问题:

  1. 使用月度统计表,数据有延迟
  2. 无法精确计算超额时间点
  3. 用户无法实时监控自己的用量

改进方案

实时用量追踪

不再依赖月度统计表,而是实时追踪:

# 实时用量追踪
def track_api_call_realtime(user_id):
    """实时追踪 API 调用"""

    current_time = datetime.now()
    current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)

    # 使用 Redis 的 INCR 命令(原子操作)
    cache_key = f'usage:{user_id}:{current_month.strftime("%Y-%m")}'

    # 增加计数
    current_count = redis_client.incr(cache_key)

    # 设置过期时间(2 个月)
    redis_client.expire(cache_key, 60 * 24 * 60)

    return current_count

实时查询用量

@app.route('/api/billing/usage')
@require_api_key
def get_current_usage():
    """获取当前实时用量"""

    current_time = datetime.now()
    current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)

    # 获取用户套餐限额
    limits = get_user_plan_limits(request.user['id'])

    # 计算月度限额
    daily_limit = get_daily_limit(request.user['id'])
    monthly_limit = daily_limit * 30

    # 获取当前用量
    cache_key = f'usage:{request.user["id"]}:{current_month.strftime("%Y-%m")}'
    current_usage = int(redis_client.get(cache_key) or 0)

    # 计算超额
    overage = max(0, current_usage - monthly_limit)

    # 计算预估费用
    overage_fee = overage * 0.001

    # 计算剩余天数
    days_in_month = (current_time.replace(day=28) + timedelta(days=4)).day
    days_remaining = days_in_month - current_time.day

    return jsonify({
        'period': {
            'month': current_month.strftime('%Y-%m'),
            'days_remaining': days_remaining
        },
        'usage': {
            'current': current_usage,
            'limit': monthly_limit,
            'overage': overage,
            'percentage': (current_usage / monthly_limit * 100) if monthly_limit > 0 else 0
        },
        'estimated_cost': {
            'overage_fee': overage_fee
        },
        'plan': {
            'name': limits['plan'],
            'daily_limit': daily_limit
        }
    })

超额预警

用量达到 80% 时预警

def check_usage_warning(user_id):
    """检查是否需要发送用量预警"""

    current_time = datetime.now()
    current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)

    # 获取用户套餐
    limits = get_user_plan_limits(user_id)
    daily_limit = get_daily_limit(user_id)
    monthly_limit = daily_limit * 30

    # 获取当前用量
    cache_key = f'usage:{user_id}:{current_month.strftime("%Y-%m")}'
    current_usage = int(redis_client.get(cache_key) or 0)

    # 计算使用百分比
    usage_percentage = (current_usage / monthly_limit * 100) if monthly_limit > 0 else 0

    # 检查是否需要预警
    warning_key = f'usage_warning:{user_id}:{current_month.strftime("%Y-%m")}'

    if usage_percentage >= 80:
        # 检查是否已经发送过预警
        if not redis_client.exists(warning_key):
            # 发送预警
            send_usage_warning_email(user_id, current_usage, monthly_limit, usage_percentage)

            # 标记已发送(7 天内不重复发送)
            redis_client.setex(warning_key, 7 * 24 * 3600, '1')

    if usage_percentage >= 100:
        # 检查是否已经发送过超额通知
        overage_key = f'usage_overage:{user_id}:{current_month.strftime("%Y-%m")}'

        if not redis_client.exists(overage_key):
            # 发送超额通知
            send_overage_notification_email(user_id, current_usage, monthly_limit)

            # 标记已发送(1 天内不重复发送)
            redis_client.setex(overage_key, 24 * 3600, '1')

def send_usage_warning_email(user_id, current_usage, limit, percentage):
    """发送用量预警邮件"""

    user = get_user_info(user_id)

    subject = f'【用量预警】您的 API 调用量已达到{percentage:.0f}%'
    body = f'''
尊敬的{user["name"]}

您的 API 调用量已达到套餐限额的{percentage:.0f}%。

当前情况:
- 本月已用:{current_usage:,}
- 套餐限额:{limit:,}
- 使用比例:{percentage:.1f}%

建议:
1. 检查是否有异常调用
2. 考虑升级套餐以获得更高限额
3. 设置调用监控和告警

如果继续超出限额,将产生超额费用。

此致
API 平台团队
'''

    send_email(to=user['email'], subject=subject, body=body)

def send_overage_notification_email(user_id, current_usage, limit):
    """发送超额通知邮件"""

    user = get_user_info(user_id)

    overage = current_usage - limit
    estimated_fee = overage * 0.001

    subject = f'【超额通知】您的 API 调用量已超出套餐限额'
    body = f'''
尊敬的{user["name"]}

您的 API 调用量已超出套餐限额。

当前情况:
- 本月已用:{current_usage:,}
- 套餐限额:{limit:,}
- 超出量:{overage:,}
- 预估超额费用:¥{estimated_fee:.2f}

计费规则:
- 超出部分按¥0.001/次计费
- 月底统一结算

建议:
1. 立即检查调用日志
2. 考虑升级套餐以节省费用
3. 设置调用限流

此致
API 平台团队
'''

    send_email(to=user['email'], subject=subject, body=body)

超额限制

硬限额 vs 软限额

# 配置超额处理策略
OVERAGE_CONFIG = {
    'free': {
        'hard_limit': True,        # 免费版:硬限额(超出后直接拒绝)
        'max_overage': 0,          # 最大超额:0
        'action': 'reject'         # 动作:拒绝请求
    },
    'basic': {
        'hard_limit': False,       # 付费版:软限额(允许超额,但收费)
        'max_overage': 10000,      # 最大超额:10000 次
        'action': 'charge'         # 动作:计费
    },
    'pro': {
        'hard_limit': False,
        'max_overage': 100000,
        'action': 'charge'
    },
    'enterprise': {
        'hard_limit': False,
        'max_overage': -1,         # -1 表示无限制
        'action': 'charge'
    }
}

def check_overage_limit(user_id):
    """检查超额限制"""

    # 获取用户套餐
    limits = get_user_plan_limits(user_id)
    plan = limits['plan']

    config = OVERAGE_CONFIG.get(plan, OVERAGE_CONFIG['basic'])

    if not config['hard_limit']:
        # 软限额,总是允许
        return {
            'allowed': True,
            'action': 'charge'
        }

    # 硬限额
    daily_limit = get_daily_limit(user_id)
    monthly_limit = daily_limit * 30

    # 获取当前用量
    current_time = datetime.now()
    current_month = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
    cache_key = f'usage:{user_id}:{current_month.strftime("%Y-%m")}'
    current_usage = int(redis_client.get(cache_key) or 0)

    if current_usage >= monthly_limit:
        return {
            'allowed': False,
            'action': 'reject',
            'reason': 'Monthly limit exceeded',
            'upgrade_url': f'/pricing?upgrade=1'
        }

    return {
        'allowed': True,
        'action': 'allow'
    }

# 在 API 调用中使用
@app.route('/api/weather')
@require_api_key
def get_weather():
    # 检查超额限制
    overage_check = check_overage_limit(request.user['id'])

    if not overage_check['allowed']:
        return jsonify({
            'error': 'Monthly limit exceeded',
            'message': '您已达到本月调用限额,请升级套餐或等待下月重置',
            'upgrade_url': overage_check.get('upgrade_url')
        }), 429

    # 追踪调用
    track_api_call_realtime(request.user['id'])

    # ... 业务逻辑

精确账单生成

使用实时数据生成账单

def generate_accurate_invoice(user_id, month):
    """使用实时数据生成精确账单"""

    # 获取月份范围
    if isinstance(month, str):
        month = datetime.strptime(month, '%Y-%m').date()

    period_start = month.replace(day=1)
    period_end = (period_start + timedelta(days=32)).replace(day=1) - timedelta(days=1)

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取用户订阅
        cursor.execute(
            '''SELECT us.*, p.* FROM user_subscriptions us
               JOIN pricing_plans p ON us.plan_id = p.id
               WHERE us.user_id = ?
               AND us.current_period_start <= ?
               AND us.current_period_end >= ?
               AND us.status = 'active'
               ORDER BY us.created_at DESC
               LIMIT 1''',
            (user_id, period_end, period_start)
        )

        subscription = cursor.fetchone()

        if not subscription:
            raise Exception(f'No active subscription for user {user_id}')

        # 从 Redis 获取实时用量
        cache_key = f'usage:{user_id}:{period_start.strftime("%Y-%m")}'
        total_calls = int(redis_client.get(cache_key) or 0)

        # 如果 Redis 中没有数据,从数据库统计
        if total_calls == 0:
            cursor.execute(
                '''SELECT SUM(total_calls) as total_calls
                   FROM user_stats_monthly
                   WHERE user_id = ? AND month = ?''',
                (user_id, period_start)
            )
            result = cursor.fetchone()
            total_calls = result['total_calls'] or 0

        # 计算超额
        plan_limit = subscription['daily_limit'] * 30
        overage = max(0, total_calls - plan_limit)

        # 生成账单
        cursor.execute(
            '''INSERT INTO invoices
               (user_id, subscription_id, billing_period_start, billing_period_end,
                total_calls, plan_limit, overage_calls, status)
               VALUES (?, ?, ?, ?, ?, ?, ?, 'draft')''',
            (user_id, subscription['id'], period_start, period_end,
             total_calls, plan_limit, overage)
        )

        invoice_id = cursor.lastrowid

        # 添加基础费用
        base_fee = subscription['price_monthly']
        cursor.execute(
            '''INSERT INTO invoice_items
               (invoice_id, description, quantity, unit_price, amount, item_type)
               VALUES (?, ?, 1, ?, ?, 'base_fee')''',
            (invoice_id, f'{subscription["display_name"]} 月费', base_fee, base_fee)
        )

        # 添加超额费用
        if overage > 0:
            overage_unit_price = 0.001
            overage_amount = overage * overage_unit_price

            cursor.execute(
                '''INSERT INTO invoice_items
                   (invoice_id, description, quantity, unit_price, amount, item_type)
                   VALUES (?, ?, ?, ?, ?, 'overage')''',
                (invoice_id, f'超额调用 {overage:,} 次', overage, overage_unit_price, overage_amount)
            )

        # 计算总计
        cursor.execute(
            '''SELECT SUM(amount) as subtotal FROM invoice_items
               WHERE invoice_id = ?''',
            (invoice_id,)
        )

        subtotal = cursor.fetchone()['subtotal'] or 0
        tax = subtotal * 0.06
        total = subtotal + tax

        # 更新账单
        cursor.execute(
            '''UPDATE invoices
               SET subtotal = ?, tax = ?, total = ?
               WHERE id = ?''',
            (subtotal, tax, total, invoice_id)
        )

        conn.commit()

        return invoice_id

用量仪表盘

前端显示

<!-- usage-dashboard.html -->
<div class="usage-dashboard">
  <h2>本月用量</h2>

  <div class="usage-summary">
    <div class="usage-bar">
      <div class="usage-fill" style="width: 75%"></div>
    </div>
    <div class="usage-text">
      已用 7,500 次 / 限额 10,000 次 (75%)
    </div>
  </div>

  <div class="usage-details">
    <div class="detail-item">
      <span class="label">套餐</span>
      <span class="value">基础版</span>
    </div>
    <div class="detail-item">
      <span class="label">日限额</span>
      <span class="value">10,000 次/天</span>
    </div>
    <div class="detail-item">
      <span class="label">已用量</span>
      <span class="value">7,500 次</span>
    </div>
    <div class="detail-item">
      <span class="label">剩余量</span>
      <span class="value">2,500 次</span>
    </div>
    <div class="detail-item">
      <span class="label">超额预估</span>
      <span class="value">¥0.00</span>
    </div>
  </div>

  <div class="usage-chart">
    <!-- 过去 30 天的调用趋势图 -->
  </div>
</div>

本节小结

✅ 完成的工作:

  • 实现了实时用量追踪
  • 实现了用量预警机制
  • 实现了超额限制策略
  • 改进了账单生成逻辑
  • 提供了用量查询 API

✅ 用户体验提升:

  • 实时查看用量
  • 及时收到预警
  • 了解超额费用

🎯 下一步: 我需要接入更多 API 来吸引更多用户。

搜索