导航菜单

格式转换流水线

上传一张图要等 12 秒

有损压缩和无损压缩的策略都搞定了。我把代码合并到上传流程里,满怀期待地测试了一下。

结果:用户上传一张 8.7MB 的图片后,要等 12 秒才能看到”上传成功”。

上传耗时分解:
  1. 客户端直传 OSS:                    2s
  2. 回调通知服务器:                    0.1s
  3. 服务器下载 OSS 原图到本地:          1.5s
  4. 智能分析内容类型:                   0.3s
  5. 生成 3 种尺寸 × 2 种格式 = 6 个文件: 6s
  6. 上传 6 个文件到 OSS:               2s
  总计:                                  12s

用户上传图片时盯着一个 loading 动画等 12 秒?不可接受。

问题出在哪?第 3~6 步都是同步执行的,阻塞了上传接口的返回。

解法很明确:上传归上传,处理归处理。异步化。

异步处理架构

核心思路:

之前(同步):
  用户上传 → OSS → 回调 → 下载 → 压缩 × 6 → 上传 OSS → 返回(12秒)

之后(异步):
  用户上传 → OSS → 回调 → 丢消息队列 → 立即返回(0.5秒)

                               Worker 消费 → 下载 → 压缩 × 6 → 上传 OSS → 通知完成
# 消息队列:使用 Redis Stream 作为简单的消息队列
import redis
import json

redis_client = redis.Redis()

class ImageProcessQueue:
    """图片处理消息队列"""
    
    QUEUE_NAME = 'image:process'
    
    def publish(self, task: dict):
        """发布处理任务"""
        task_data = json.dumps(task)
        msg_id = redis_client.xadd(self.QUEUE_NAME, {'data': task_data})
        return msg_id
    
    def consume(self, consumer_group: str, consumer_name: str, count: int = 1):
        """消费处理任务"""
        # 确保消费者组存在
        try:
            redis_client.xgroup_create(self.QUEUE_NAME, consumer_group, id='0')
        except redis.ResponseError:
            pass  # 组已存在
        
        messages = redis_client.xreadgroup(
            consumer_group, consumer_name,
            {self.QUEUE_NAME: '>'},
            count=count, block=5000
        )
        
        tasks = []
        for stream, msgs in messages:
            for msg_id, data in msgs:
                task = json.loads(data[b'data'])
                tasks.append((msg_id, task))
        
        return tasks
    
    def ack(self, consumer_group: str, msg_id: bytes):
        """确认任务完成"""
        redis_client.xack(self.QUEUE_NAME, consumer_group, msg_id)
    
    def pending_count(self, consumer_group: str):
        """查看待处理任务数"""
        info = redis_client.xpending(self.QUEUE_NAME, consumer_group)
        return info.get('pending', 0)

修改上传回调接口,把处理任务丢到队列后立即返回:

from datetime import datetime

@app.route('/api/upload/callback', methods=['POST'])
def upload_callback():
    """OSS 上传成功后的回调——异步触发处理"""
    data = request.json
    object_key = data['object_key']
    user_id = data['user_id']
    file_size = data.get('size', 0)
    
    # 1. 记录到数据库,状态为 "processing"
    photo_id = db.insert('photos', {
        'user_id': user_id,
        'object_key': object_key,
        'original_size': file_size,
        'status': 'processing',
        'created_at': datetime.now(),
    })
    
    # 2. 发布处理任务到消息队列
    queue = ImageProcessQueue()
    queue.publish({
        'photo_id': photo_id,
        'object_key': object_key,
        'user_id': user_id,
        'priority': 'high' if file_size > 10 * 1024 * 1024 else 'normal',
    })
    
    # 3. 立即返回(不再等待处理完成)
    return jsonify({
        'photo_id': photo_id,
        'status': 'processing',
        'message': '图片正在处理中,预计 10 秒内完成',
    })

效果:上传回调接口从 12 秒降到了 0.3 秒

Worker:并行生成多格式多尺寸

Worker 从队列中消费任务,并行处理:

import concurrent.futures
from dataclasses import dataclass
from typing import List

@dataclass
class DerivativeSpec:
    """衍生图规格"""
    size_name: str
    max_width: int
    max_height: int
    formats: List[str]  # ['webp', 'jpeg']

# 定义需要生成的衍生图规格
DERIVATIVE_SPECS = [
    DerivativeSpec('thumb',   200,  200,  ['webp']),
    DerivativeSpec('small',   400,  400,  ['webp']),
    DerivativeSpec('medium',  800,  800,  ['webp', 'jpeg']),
    DerivativeSpec('large',   1200, 1200, ['webp', 'jpeg']),
    DerivativeSpec('xlarge',  1920, 1920, ['webp']),
]

class ImageProcessWorker:
    """图片处理 Worker"""
    
    def __init__(self, worker_id: str):
        self.worker_id = worker_id
        self.queue = ImageProcessQueue()
        self.compressor = SmartCompressor()       # 上一节的有损压缩器
        self.lossless = LosslessCompressor()       # 上一节的无损压缩器
    
    def run(self):
        """主循环:消费任务 → 处理 → 确认"""
        print(f"Worker {self.worker_id} started")
        
        while True:
            tasks = self.queue.consume(
                consumer_group='image-processors',
                consumer_name=self.worker_id,
                count=1,
            )
            
            for msg_id, task in tasks:
                try:
                    self._process_one(task)
                    self.queue.ack('image-processors', msg_id)
                except Exception as e:
                    print(f"处理失败: {task['photo_id']}, 错误: {e}")
                    # 处理失败不 ack,消息会重新投递
                    self._report_failure(task, str(e))
    
    def _process_one(self, task: dict):
        """处理单张图片"""
        photo_id = task['photo_id']
        object_key = task['object_key']
        
        print(f"[{self.worker_id}] 开始处理: {photo_id}")
        
        # 1. 从 OSS 下载原图到临时目录
        local_path = f'/tmp/process_{photo_id}_original'
        download_from_oss(object_key, local_path)
        
        # 2. 分析图片
        img = Image.open(local_path)
        content_type = self.compressor.analyze_content(img)
        is_lossless = content_type in ('product', 'screenshot', 'graphic')
        
        # 3. 并行生成所有衍生图
        derivatives = self._generate_derivatives(img, is_lossless)
        
        # 4. 上传所有衍生图到 OSS
        for key, data in derivatives.items():
            upload_to_oss(key, data)
        
        # 5. 更新数据库
        db.update('photos',
            {'status': 'ready', 'derivatives': json.dumps(derivatives)},
            {'id': photo_id}
        )
        
        # 6. 通知前端(通过 WebSocket 或轮询接口)
        self._notify_completion(task['user_id'], photo_id, derivatives)
        
        # 7. 清理临时文件
        os.remove(local_path)
        
        print(f"[{self.worker_id}] 处理完成: {photo_id}, 生成 {len(derivatives)} 个衍生图")
    
    def _generate_derivatives(self, img: Image.Image, is_lossless: bool) -> dict:
        """并行生成所有衍生图"""
        results = {}
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            futures = {}
            
            for spec in DERIVATIVE_SPECS:
                for fmt in spec.formats:
                    future = executor.submit(
                        self._generate_one_derivative,
                        img, spec, fmt, is_lossless
                    )
                    futures[future] = (spec.size_name, fmt)
            
            for future in concurrent.futures.as_completed(futures):
                size_name, fmt = futures[future]
                try:
                    result = future.result()
                    object_key = self._build_derivative_key(
                        img.info.get('object_key', ''),
                        size_name, fmt
                    )
                    results[f"{size_name}_{fmt}"] = {
                        'object_key': object_key,
                        'data': result['data'],
                        'size_kb': result['size_kb'],
                        'width': result['width'],
                        'height': result['height'],
                    }
                except Exception as e:
                    print(f"生成失败 ({size_name}_{fmt}): {e}")
        
        return results
    
    def _generate_one_derivative(self, img, spec, fmt, is_lossless):
        """生成单个衍生图"""
        # 缩放
        resized = img.copy()
        resized.thumbnail((spec.max_width, spec.max_height), Image.LANCZOS)
        
        buffer = io.BytesIO()
        
        if is_lossless and fmt == 'webp':
            # 无损压缩
            resized.save(buffer, 'WebP', lossless=True, method=6)
        elif fmt == 'webp':
            # 有损压缩
            resized.save(buffer, 'WebP', quality=80, method=6)
        elif fmt == 'jpeg':
            resized = resized.convert('RGB')  # JPEG 不支持 alpha
            resized.save(buffer, 'JPEG', quality=85)
        
        data = buffer.getvalue()
        
        return {
            'data': data,
            'size_kb': round(len(data) / 1024, 1),
            'width': resized.width,
            'height': resized.height,
        }
    
    def _build_derivative_key(self, original_key, size_name, fmt):
        """构建衍生图的 OSS 路径"""
        # originals/2024/06/a3/f8c2e1b45678.jpg
        # → thumbs/medium_webp/2024/06/a3/f8c2e1b45678.webp
        parts = original_key.split('/')
        filename = parts[-1].rsplit('.', 1)[0]
        path = '/'.join(parts[1:-1])
        return f'thumbs/{size_name}_{fmt}/{path}/{filename}.{fmt}'
    
    def _notify_completion(self, user_id, photo_id, derivatives):
        """通知前端处理完成"""
        # 通过 Redis pub/sub 通知 WebSocket 服务
        redis_client.publish(
            f'photo:ready:{user_id}',
            json.dumps({
                'photo_id': photo_id,
                'status': 'ready',
                'urls': {
                    k: f'https://cdn.guangying.com/{v["object_key"]}'
                    for k, v in derivatives.items()
                },
            })
        )
    
    def _report_failure(self, task, error):
        """报告处理失败"""
        db.update('photos',
            {'status': 'failed', 'error': error},
            {'id': task['photo_id']}
        )


# 启动 Worker
if __name__ == '__main__':
    import sys
    worker_id = sys.argv[1] if len(sys.argv) > 1 else 'worker-1'
    worker = ImageProcessWorker(worker_id)
    worker.run()

前端进度通知

用户上传后立即返回”处理中”,前端通过轮询或 WebSocket 获取处理进度:

// 前端:图片处理状态跟踪
class ImageProcessTracker {
  private pollInterval: number = 2000; // 2秒轮询一次

  async waitForReady(photoId: string): Promise<PhotoResult> {
    return new Promise((resolve, reject) => {
      const poll = async () => {
        try {
          const result = await fetch(`/api/photos/${photoId}/status`).then(r => r.json());
          
          if (result.status === 'ready') {
            resolve(result);
          } else if (result.status === 'failed') {
            reject(new Error(`处理失败: ${result.error}`));
          } else {
            // processing,继续轮询
            setTimeout(poll, this.pollInterval);
          }
        } catch (e) {
          reject(e);
        }
      };
      
      poll();
    });
  }
}

// 使用
const uploader = new OSSDirectUploader();
const tracker = new ImageProcessTracker();

async function uploadAndShow(file: File) {
  // 1. 上传原图到 OSS(2~3秒)
  const photoId = await uploader.upload(file, (pct) => {
    updateUploadProgress(pct);
  });
  
  // 2. 立即显示"处理中"占位
  showProcessingPlaceholder(photoId);
  
  // 3. 等待处理完成(通常 5~8秒)
  try {
    const result = await tracker.waitForReady(photoId);
    showPhoto(result.urls); // 显示处理好的图片
  } catch (e) {
    showError('图片处理失败,请重试');
  }
}

后端轮询接口:

@app.route('/api/photos/<photo_id>/status')
def photo_status(photo_id):
    """查询图片处理状态"""
    photo = db.query('photos', {'id': photo_id})
    
    if not photo:
        return jsonify({'error': 'not found'}), 404
    
    response = {
        'photo_id': photo_id,
        'status': photo['status'],  # processing / ready / failed
    }
    
    if photo['status'] == 'ready':
        derivatives = json.loads(photo['derivatives'])
        response['urls'] = {
            k: f'https://cdn.guangying.com/{v["object_key"]}'
            for k, v in derivatives.items()
        }
    
    return jsonify(response)

处理进度优化:分阶段通知

一张图要生成 6~10 个衍生图,每完成一个都可以通知前端:

class ProgressNotifier:
    """分阶段进度通知"""
    
    STAGES = [
        ('download',     10,  '下载原图'),
        ('analyze',      15,  '分析内容'),
        ('thumb_webp',   25,  '缩略图'),
        ('small_webp',   35,  '小图'),
        ('medium_webp',  50,  '中图'),
        ('medium_jpeg',  60,  '中图 JPEG'),
        ('large_webp',   75,  '大图'),
        ('large_jpeg',   85,  '大图 JPEG'),
        ('xlarge_webp',  95,  '超大图'),
        ('upload',       100, '上传完成'),
    ]
    
    def notify_stage(self, photo_id, stage_name):
        """通知某个阶段完成"""
        progress = next(p for name, p, _ in self.STAGES if name == stage_name)
        _, _, desc = next(s for s in self.STAGES if s[0] == stage_name)
        
        redis_client.publish(f'photo:progress:{photo_id}', json.dumps({
            'stage': stage_name,
            'progress': progress,
            'description': desc,
        }))

前端展示更细粒度的进度:

// 前端:展示细粒度处理进度
function showProcessingProgress(photoId: string) {
  const ws = new WebSocket(`wss://api.guangying.com/ws/photo/${photoId}`);
  
  ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    if (data.type === 'progress') {
      updateProgressBar(data.progress);
      updateStatusText(data.description);
    } else if (data.type === 'ready') {
      showPhoto(data.urls);
      ws.close();
    }
  };
}

错误处理和重试

异步处理带来了新的挑战——Worker 可能崩溃、任务可能失败。需要健壮的重试机制:

class RetryableWorker(ImageProcessWorker):
    """带重试机制的 Worker"""
    
    MAX_RETRIES = 3
    RETRY_DELAYS = [30, 120, 600]  # 30秒、2分钟、10分钟
    
    def _process_one(self, task: dict):
        retry_count = task.get('retry_count', 0)
        
        try:
            super()._process_one(task)
        except Exception as e:
            retry_count += 1
            
            if retry_count <= self.MAX_RETRIES:
                # 重试:延迟后重新入队
                delay = self.RETRY_DELAYS[min(retry_count - 1, len(self.RETRY_DELAYS) - 1)]
                
                print(f"处理失败,{delay}秒后第 {retry_count} 次重试: {task['photo_id']}")
                
                task['retry_count'] = retry_count
                task['last_error'] = str(e)
                
                # 延迟重新入队(用 Redis 的延迟队列实现)
                redis_client.zadd(
                    'image:process:delayed',
                    {json.dumps(task): time.time() + delay}
                )
            else:
                # 超过重试次数,标记为永久失败
                print(f"处理彻底失败: {task['photo_id']}, 错误: {e}")
                db.update('photos',
                    {'status': 'failed', 'error': f'重试 {retry_count} 次后失败: {e}'},
                    {'id': task['photo_id']}
                )
                self._notify_user_failure(task)


# 延迟队列消费者(独立的线程/进程)
def delayed_queue_consumer():
    """扫描延迟队列,到期的任务重新放入主队列"""
    queue = ImageProcessQueue()
    
    while True:
        now = time.time()
        
        # 取出到期的任务
        tasks = redis_client.zrangebyscore('image:process:delayed', 0, now)
        
        for task_data in tasks:
            task = json.loads(task_data)
            queue.publish(task)
            redis_client.zrem('image:process:delayed', task_data)
        
        time.sleep(1)

处理性能优化

一张图生成 6~10 个衍生图,如何更快?

class OptimizedWorker(ImageProcessWorker):
    """性能优化版 Worker"""
    
    def _generate_derivatives(self, img, is_lossless):
        """优化:先缩放到最大尺寸,再依次缩小(避免重复解码)"""
        results = {}
        
        # 按尺寸从大到小排序
        sorted_specs = sorted(DERIVATIVE_SPECS, key=lambda s: s.max_width, reverse=True)
        
        current_img = img
        
        for spec in sorted_specs:
            # 从上一级缩放(而不是每次都从原图缩放)
            resized = current_img.copy()
            resized.thumbnail((spec.max_width, spec.max_height), Image.LANCZOS)
            
            for fmt in spec.formats:
                buffer = io.BytesIO()
                
                if is_lossless and fmt == 'webp':
                    resized.save(buffer, 'WebP', lossless=True, method=4)  # method=4 更快
                elif fmt == 'webp':
                    resized.save(buffer, 'WebP', quality=80, method=4)
                elif fmt == 'jpeg':
                    rgb = resized.convert('RGB')
                    rgb.save(buffer, 'JPEG', quality=85, optimize=True)
                
                object_key = self._build_derivative_key(
                    img.info.get('object_key', ''),
                    spec.size_name, fmt
                )
                
                results[f"{spec.size_name}_{fmt}"] = {
                    'object_key': object_key,
                    'data': buffer.getvalue(),
                    'size_kb': round(len(buffer.getvalue()) / 1024, 1),
                    'width': resized.width,
                    'height': resized.height,
                }
            
            current_img = resized  # 下一个尺寸从当前缩放
        
        return results


# 性能对比
"""
优化前(每个尺寸从原图缩放):
  thumb (200px)   ← 从 6000px 缩放, 1.2s
  small (400px)   ← 从 6000px 缩放, 1.0s
  medium (800px)  ← 从 6000px 缩放, 0.8s
  large (1200px)  ← 从 6000px 缩放, 0.6s
  总计: ~4.5s

优化后(从上一级缩放):
  xlarge (1920px) ← 从 6000px 缩放, 0.5s
  large (1200px)  ← 从 1920px 缩放, 0.1s
  medium (800px)  ← 从 1200px 缩放, 0.08s
  small (400px)   ← 从 800px 缩放,  0.05s
  thumb (200px)   ← 从 400px 缩放,  0.03s
  总计: ~0.8s
  
提速 5.6 倍!
"""

本节小结

我学到了什么

  • 图片处理必须异步化——用户不应该等待压缩和格式转换
  • 消息队列解耦了上传和处理,上传接口从 12 秒降到 0.3 秒
  • Worker 并行生成衍生图,从大到小逐级缩放避免重复解码
  • 分阶段进度通知让用户知道图片在处理中

⚠️ 踩过的坑

  • Redis Stream 消费者组需要正确处理 ack,否则消息会被重复消费
  • PIL/Pillow 的 thumbnail() 是原地操作,必须先 copy()
  • 从原图逐个缩放比从上一级缩放慢 5 倍以上

🎯 下一步:图片处理好了,但还有一件事没做——审核。用户上传了什么内容?有没有违规图片?

我的思考

思考 1

如果消息队列里的任务突然积压了 10 万条(比如运营搞了一个上传活动),怎么快速消化积压?

参考答案

消息积压的本质是消费速度 < 生产速度。解决思路有两条:

1. 快速扩容消费者

# 动态扩容 Worker(基于积压量自动扩容)
class AutoScalingWorkerPool:
    def __init__(self, min_workers=2, max_workers=20):
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.active_workers = {}
    
    def check_and_scale(self):
        """定期检查积压量并扩缩容"""
        pending = self.queue.pending_count('image-processors')
        
        if pending > 1000:
            # 积压超过 1000,扩容到最大
            target = self.max_workers
        elif pending > 100:
            # 积压 100~1000,按需扩容
            target = min(pending // 50 + self.min_workers, self.max_workers)
        else:
            # 积压少,缩回最小
            target = self.min_workers
        
        self._adjust_workers(target)

2. 降低单任务耗时

# 积压时的降级策略
class BackpressureStrategy:
    def apply(self, task):
        pending = queue.pending_count()
        
        if pending > 50000:
            # 极端积压:只生成缩略图,跳过大尺寸
            return {
                'specs': [thumb_spec],  # 只生成缩略图
                'quality': 60,          # 降低质量
                'method': 0,            # 最快的压缩方法
            }
        elif pending > 10000:
            # 严重积压:跳过 xlarge
            return {
                'specs': [thumb, small, medium, large],
                'quality': 75,
                'method': 4,
            }
        else:
            # 正常
            return {
                'specs': ALL_SPECS,
                'quality': 80,
                'method': 6,
            }

核心原则:积压时宁可降低质量也要保证速度,等积压消化后再用后台任务重新生成高质量版本。

思考 2

为什么从上一级尺寸缩放比从原图缩放快这么多?有没有精度损失?

参考答案

为什么快

从 6000px 缩放到 200px:
  - 输入像素数:6000 × 4000 = 24,000,000
  - Lanczos 算法对每个输出像素需要采样周围 6×6 = 36 个输入像素
  - 总计算量:200 × 133 × 36 ≈ 960,000 次采样

从 400px 缩放到 200px:
  - 输入像素数:400 × 267 = 106,800
  - 总计算量:200 × 133 × 36 ≈ 960,000 次采样
  
等等,计算量不是差不多吗?

实际上关键不在于采样次数,而在于:

  1. 内存访问:大图像占更多内存,cache miss 更多
  2. PIL 内部处理:解码 6000px 图像的开销远大于 400px
  3. I/O 等待:大图像在内存中读写更慢

精度损失

确实有,但非常小。因为 Lanczos 缩放本身就是有损的——两次小幅度缩放 vs 一次大幅度缩放,差异通常在 SSIM 0.99+ 以上,人眼完全无法分辨。

一个例外:如果原图有很细的线条或 moiré 纹理,多级缩放可能产生累积的混叠效应。对于这种情况,可以从原图直接缩放。

搜索