导航菜单

账单统计

场景

付费套餐上线后,第一批用户开始付费。

七个月后:

  • 付费用户:100 个
  • 月收入:约¥15000

但财务部门找到我:

需求:
1. 每月自动生成账单
2. 统计每个用户的调用量
3. 计算超额部分
4. 发送账单给用户

问题分析

我查了一下当前的统计系统:

-- 月度统计表
CREATE TABLE user_stats_monthly (
  user_id INT NOT NULL,
  month DATE NOT NULL,
  total_calls INT DEFAULT 0,
  -- ...
);

问题:

  1. 统计表每月聚合一次,不是实时的
  2. 没有按 API 分别统计
  3. 没有按时间段统计(用于计算超限)
  4. 没有计费相关的字段

账单系统设计

需求分析

账单需要包含:
1. 基本信息(用户、周期、套餐)
2. 使用情况(总调用量、各 API 调用量)
3. 费用明细(基础费用、超额费用)
4. 支付状态

计费规则:
- 基础费用:套餐月费
- 超额费用:超出套餐部分按次计费
- 超额价格:¥0.001/次(1 元 1000 次)

数据库设计

-- 账单表
CREATE TABLE invoices (
  id INT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  subscription_id INT NOT NULL,

  -- 账单周期
  billing_period_start DATE NOT NULL,
  billing_period_end DATE NOT NULL,

  -- 费用信息
  subtotal DECIMAL(10, 2) DEFAULT 0,  -- 小计
  tax DECIMAL(10, 2) DEFAULT 0,        -- 税费
  total DECIMAL(10, 2) DEFAULT 0,      -- 总计

  -- 使用情况
  total_calls INT DEFAULT 0,
  plan_limit INT DEFAULT 0,
  overage_calls INT DEFAULT 0,

  -- 状态
  status VARCHAR(20) DEFAULT 'draft',  -- draft, sent, paid, overdue, cancelled

  -- 时间
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  sent_at TIMESTAMP NULL,
  paid_at TIMESTAMP NULL,

  FOREIGN KEY (user_id) REFERENCES users(id),
  FOREIGN KEY (subscription_id) REFERENCES user_subscriptions(id),
  INDEX idx_user_status (user_id, status),
  INDEX idx_status_period (status, billing_period_end)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 账单明细表
CREATE TABLE invoice_items (
  id INT PRIMARY KEY AUTO_INCREMENT,
  invoice_id INT NOT NULL,

  -- 明细信息
  description VARCHAR(255) NOT NULL,
  quantity INT DEFAULT 1,
  unit_price DECIMAL(10, 2) DEFAULT 0,
  amount DECIMAL(10, 2) DEFAULT 0,

  -- 类型
  item_type VARCHAR(50) NOT NULL,  -- base_fee, overage, discount, tax

  -- 元数据
  metadata JSON,

  FOREIGN KEY (invoice_id) REFERENCES invoices(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

账单生成逻辑

按用户生成月度账单

def generate_monthly_invoice(user_id, month):
    """生成用户月度账单"""

    # 获取月份范围
    if isinstance(month, str):
        month = datetime.strptime(month, '%Y-%m').date()

    period_start = month.replace(day=1)
    period_end = (period_start + timedelta(days=32)).replace(day=1) - timedelta(days=1)

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取用户订阅
        cursor.execute(
            '''SELECT us.*, p.* FROM user_subscriptions us
               JOIN pricing_plans p ON us.plan_id = p.id
               WHERE us.user_id = ?
               AND us.current_period_start <= ?
               AND us.current_period_end >= ?
               AND us.status = 'active'
               ORDER BY us.created_at DESC
               LIMIT 1''',
            (user_id, period_end, period_start)
        )

        subscription = cursor.fetchone()

        if not subscription:
            raise Exception(f'No active subscription for user {user_id}')

        # 统计总调用量
        cursor.execute(
            '''SELECT SUM(total_calls) as total_calls
               FROM user_stats_monthly
               WHERE user_id = ? AND month = ?''',
            (user_id, period_start)
        )

        stats = cursor.fetchone()
        total_calls = stats['total_calls'] or 0

        # 计算超额
        plan_limit = subscription['daily_limit'] * 30  # 月度限额
        overage = max(0, total_calls - plan_limit)

        # 生成账单
        cursor.execute(
            '''INSERT INTO invoices
               (user_id, subscription_id, billing_period_start, billing_period_end,
                total_calls, plan_limit, overage_calls, status)
               VALUES (?, ?, ?, ?, ?, ?, ?, 'draft')''',
            (user_id, subscription['id'], period_start, period_end,
             total_calls, plan_limit, overage)
        )

        invoice_id = cursor.lastrowid

        # 添加基础费用
        base_fee = subscription['price_monthly']
        cursor.execute(
            '''INSERT INTO invoice_items
               (invoice_id, description, quantity, unit_price, amount, item_type)
               VALUES (?, ?, 1, ?, ?, 'base_fee')''',
            (invoice_id, f'{subscription["display_name"]} 月费', base_fee, base_fee)
        )

        # 添加超额费用
        if overage > 0:
            overage_unit_price = 0.001  # ¥0.001/次
            overage_amount = overage * overage_unit_price

            cursor.execute(
                '''INSERT INTO invoice_items
                   (invoice_id, description, quantity, unit_price, amount, item_type)
                   VALUES (?, ?, ?, ?, ?, 'overage')''',
                (invoice_id, f'超额调用 {overage:,} 次', overage, overage_unit_price, overage_amount)
            )

        # 计算总计
        cursor.execute(
            '''SELECT SUM(amount) as subtotal FROM invoice_items
               WHERE invoice_id = ?''',
            (invoice_id,)
        )

        subtotal = cursor.fetchone()['subtotal'] or 0

        # 添加税费(假设 6%)
        tax = subtotal * 0.06
        total = subtotal + tax

        # 更新账单总计
        cursor.execute(
            '''UPDATE invoices
               SET subtotal = ?, tax = ?, total = ?
               WHERE id = ?''',
            (subtotal, tax, total, invoice_id)
        )

        conn.commit()

        return invoice_id

批量生成账单

月底定时任务

def generate_all_monthly_invoices():
    """生成所有用户的月度账单"""

    # 上个月
    last_month = (datetime.now().replace(day=1) - timedelta(days=1))
    billing_month = last_month.replace(day=1)

    logging.info(f'Generating invoices for {billing_month.strftime("%Y-%m")}')

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取所有有订阅的用户
        cursor.execute(
            '''SELECT DISTINCT user_id FROM user_subscriptions
               WHERE status = 'active'
               AND current_period_start <= ?
               AND current_period_end >= ?''',
            (billing_month, billing_month)
        )

        users = cursor.fetchall()

        logging.info(f'Found {len(users)} users with active subscriptions')

        success_count = 0
        failed_count = 0

        for user in users:
            try:
                invoice_id = generate_monthly_invoice(user['user_id'], billing_month)
                logging.info(f'Generated invoice {invoice_id} for user {user["user_id"]}')
                success_count += 1
            except Exception as e:
                logging.error(f'Failed to generate invoice for user {user["user_id"]}: {e}')
                failed_count += 1

        logging.info(f'Invoice generation completed: {success_count} success, {failed_count} failed')

        return {
            'total': len(users),
            'success': success_count,
            'failed': failed_count
        }

# 使用 cron 定时执行:每月 1 号凌晨 2 点
# 0 2 1 * * /usr/bin/python3 /path/to/generate_invoices.py

账单查询 API

获取用户账单列表

@app.route('/api/billing/invoices')
@require_api_key
def get_invoices():
    """获取用户账单列表"""

    with get_db_connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            '''SELECT * FROM invoices
               WHERE user_id = ?
               ORDER BY billing_period_end DESC
               LIMIT 24''',  -- 最近 2
            (request.user['id'],)
        )

        invoices = cursor.fetchall()

        result = []
        for invoice in invoices:
            result.append({
                'id': invoice['id'],
                'billing_period': {
                    'start': invoice['billing_period_start'].isoformat(),
                    'end': invoice['billing_period_end'].isoformat()
                },
                'amounts': {
                    'subtotal': float(invoice['subtotal']),
                    'tax': float(invoice['tax']),
                    'total': float(invoice['total'])
                },
                'usage': {
                    'total_calls': invoice['total_calls'],
                    'plan_limit': invoice['plan_limit'],
                    'overage_calls': invoice['overage_calls']
                },
                'status': invoice['status'],
                'created_at': invoice['created_at'].isoformat(),
                'paid_at': invoice['paid_at'].isoformat() if invoice['paid_at'] else None
            })

        return jsonify(result)

获取单个账单详情

@app.route('/api/billing/invoices/<int:invoice_id>')
@require_api_key
def get_invoice_detail(invoice_id):
    """获取账单详情"""

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取账单基本信息
        cursor.execute(
            '''SELECT * FROM invoices
               WHERE id = ? AND user_id = ?''',
            (invoice_id, request.user['id'])
        )

        invoice = cursor.fetchone()

        if not invoice:
            return jsonify({'error': 'Invoice not found'}), 404

        # 获取账单明细
        cursor.execute(
            'SELECT * FROM invoice_items WHERE invoice_id = ?',
            (invoice_id,)
        )

        items = cursor.fetchall()

        item_list = []
        for item in items:
            item_list.append({
                'description': item['description'],
                'quantity': item['quantity'],
                'unit_price': float(item['unit_price']),
                'amount': float(item['amount']),
                'type': item['item_type']
            })

        return jsonify({
            'id': invoice['id'],
            'billing_period': {
                'start': invoice['billing_period_start'].isoformat(),
                'end': invoice['billing_period_end'].isoformat()
            },
            'amounts': {
                'subtotal': float(invoice['subtotal']),
                'tax': float(invoice['tax']),
                'total': float(invoice['total'])
            },
            'usage': {
                'total_calls': invoice['total_calls'],
                'plan_limit': invoice['plan_limit'],
                'overage_calls': invoice['overage_calls']
            },
            'items': item_list,
            'status': invoice['status'],
            'created_at': invoice['created_at'].isoformat(),
            'sent_at': invoice['sent_at'].isoformat() if invoice['sent_at'] else None,
            'paid_at': invoice['paid_at'].isoformat() if invoice['paid_at'] else None
        })

账单发送

自动发送账单

def send_invoice_to_user(invoice_id):
    """发送账单给用户"""

    with get_db_connection() as conn:
        cursor = conn.cursor()

        # 获取账单和用户信息
        cursor.execute(
            '''SELECT i.*, u.email, u.name
               FROM invoices i
               JOIN users u ON i.user_id = u.id
               WHERE i.id = ?''',
            (invoice_id,)
        )

        invoice = cursor.fetchone()

        if not invoice:
            raise Exception(f'Invoice {invoice_id} not found')

        # 生成账单 PDF
        pdf_content = generate_invoice_pdf(invoice)

        # 发送邮件
        subject = f'您的{invoice["billing_period_start"].strftime("%Y-%m")}账单'
        body = f'''
尊敬的{invoice["name"]}

您的{invoice["billing_period_start"].strftime("%Y-%m")}月账单已生成。

账单详情:
- 账单周期:{invoice["billing_period_start"]}{invoice["billing_period_end"]}
- 总调用量:{invoice["total_calls"]:,}
- 套餐限额:{invoice["plan_limit"]:,}
- 超额调用:{invoice["overage_calls"]:,}

费用明细:
- 小计:¥{invoice["subtotal"]:.2f}
- 税费:¥{invoice["tax"]:.2f}
- 总计:¥{invoice["total"]:.2f}

请登录查看详情并支付。

此致
API 平台团队
'''

        send_email(
            to=invoice['email'],
            subject=subject,
            body=body,
            attachments=[
                ('invoice.pdf', pdf_content)
            ]
        )

        # 更新发送状态
        cursor.execute(
            'UPDATE invoices SET status = "sent", sent_at = NOW() WHERE id = ?',
            (invoice_id,)
        )
        conn.commit()

        return True

本节小结

✅ 完成的工作:

  • 设计了账单和明细数据表
  • 实现了月度账单生成逻辑
  • 实现了批量生成账单的定时任务
  • 实现了账单查询 API
  • 实现了账单发送功能

✅ 业务价值:

  • 自动化账单生成流程
  • 减少人工操作
  • 提升用户体验

⚠️ 下一步: 我需要实现超额调用的精确计算

🎯 下一步: 超出套餐的部分如何精确计费?

搜索