格式转换流水线
上传一张图要等 12 秒
有损压缩和无损压缩的策略都搞定了。我把代码合并到上传流程里,满怀期待地测试了一下。
结果:用户上传一张 8.7MB 的图片后,要等 12 秒才能看到”上传成功”。
上传耗时分解:
1. 客户端直传 OSS: 2s
2. 回调通知服务器: 0.1s
3. 服务器下载 OSS 原图到本地: 1.5s
4. 智能分析内容类型: 0.3s
5. 生成 3 种尺寸 × 2 种格式 = 6 个文件: 6s
6. 上传 6 个文件到 OSS: 2s
总计: 12s用户上传图片时盯着一个 loading 动画等 12 秒?不可接受。
问题出在哪?第 3~6 步都是同步执行的,阻塞了上传接口的返回。
解法很明确:上传归上传,处理归处理。异步化。
异步处理架构
核心思路:
之前(同步):
用户上传 → OSS → 回调 → 下载 → 压缩 × 6 → 上传 OSS → 返回(12秒)
之后(异步):
用户上传 → OSS → 回调 → 丢消息队列 → 立即返回(0.5秒)
↓
Worker 消费 → 下载 → 压缩 × 6 → 上传 OSS → 通知完成# 消息队列:使用 Redis Stream 作为简单的消息队列
import redis
import json
redis_client = redis.Redis()
class ImageProcessQueue:
"""图片处理消息队列"""
QUEUE_NAME = 'image:process'
def publish(self, task: dict):
"""发布处理任务"""
task_data = json.dumps(task)
msg_id = redis_client.xadd(self.QUEUE_NAME, {'data': task_data})
return msg_id
def consume(self, consumer_group: str, consumer_name: str, count: int = 1):
"""消费处理任务"""
# 确保消费者组存在
try:
redis_client.xgroup_create(self.QUEUE_NAME, consumer_group, id='0')
except redis.ResponseError:
pass # 组已存在
messages = redis_client.xreadgroup(
consumer_group, consumer_name,
{self.QUEUE_NAME: '>'},
count=count, block=5000
)
tasks = []
for stream, msgs in messages:
for msg_id, data in msgs:
task = json.loads(data[b'data'])
tasks.append((msg_id, task))
return tasks
def ack(self, consumer_group: str, msg_id: bytes):
"""确认任务完成"""
redis_client.xack(self.QUEUE_NAME, consumer_group, msg_id)
def pending_count(self, consumer_group: str):
"""查看待处理任务数"""
info = redis_client.xpending(self.QUEUE_NAME, consumer_group)
return info.get('pending', 0)修改上传回调接口,把处理任务丢到队列后立即返回:
from datetime import datetime
@app.route('/api/upload/callback', methods=['POST'])
def upload_callback():
"""OSS 上传成功后的回调——异步触发处理"""
data = request.json
object_key = data['object_key']
user_id = data['user_id']
file_size = data.get('size', 0)
# 1. 记录到数据库,状态为 "processing"
photo_id = db.insert('photos', {
'user_id': user_id,
'object_key': object_key,
'original_size': file_size,
'status': 'processing',
'created_at': datetime.now(),
})
# 2. 发布处理任务到消息队列
queue = ImageProcessQueue()
queue.publish({
'photo_id': photo_id,
'object_key': object_key,
'user_id': user_id,
'priority': 'high' if file_size > 10 * 1024 * 1024 else 'normal',
})
# 3. 立即返回(不再等待处理完成)
return jsonify({
'photo_id': photo_id,
'status': 'processing',
'message': '图片正在处理中,预计 10 秒内完成',
})效果:上传回调接口从 12 秒降到了 0.3 秒。
Worker:并行生成多格式多尺寸
Worker 从队列中消费任务,并行处理:
import concurrent.futures
from dataclasses import dataclass
from typing import List
@dataclass
class DerivativeSpec:
"""衍生图规格"""
size_name: str
max_width: int
max_height: int
formats: List[str] # ['webp', 'jpeg']
# 定义需要生成的衍生图规格
DERIVATIVE_SPECS = [
DerivativeSpec('thumb', 200, 200, ['webp']),
DerivativeSpec('small', 400, 400, ['webp']),
DerivativeSpec('medium', 800, 800, ['webp', 'jpeg']),
DerivativeSpec('large', 1200, 1200, ['webp', 'jpeg']),
DerivativeSpec('xlarge', 1920, 1920, ['webp']),
]
class ImageProcessWorker:
"""图片处理 Worker"""
def __init__(self, worker_id: str):
self.worker_id = worker_id
self.queue = ImageProcessQueue()
self.compressor = SmartCompressor() # 上一节的有损压缩器
self.lossless = LosslessCompressor() # 上一节的无损压缩器
def run(self):
"""主循环:消费任务 → 处理 → 确认"""
print(f"Worker {self.worker_id} started")
while True:
tasks = self.queue.consume(
consumer_group='image-processors',
consumer_name=self.worker_id,
count=1,
)
for msg_id, task in tasks:
try:
self._process_one(task)
self.queue.ack('image-processors', msg_id)
except Exception as e:
print(f"处理失败: {task['photo_id']}, 错误: {e}")
# 处理失败不 ack,消息会重新投递
self._report_failure(task, str(e))
def _process_one(self, task: dict):
"""处理单张图片"""
photo_id = task['photo_id']
object_key = task['object_key']
print(f"[{self.worker_id}] 开始处理: {photo_id}")
# 1. 从 OSS 下载原图到临时目录
local_path = f'/tmp/process_{photo_id}_original'
download_from_oss(object_key, local_path)
# 2. 分析图片
img = Image.open(local_path)
content_type = self.compressor.analyze_content(img)
is_lossless = content_type in ('product', 'screenshot', 'graphic')
# 3. 并行生成所有衍生图
derivatives = self._generate_derivatives(img, is_lossless)
# 4. 上传所有衍生图到 OSS
for key, data in derivatives.items():
upload_to_oss(key, data)
# 5. 更新数据库
db.update('photos',
{'status': 'ready', 'derivatives': json.dumps(derivatives)},
{'id': photo_id}
)
# 6. 通知前端(通过 WebSocket 或轮询接口)
self._notify_completion(task['user_id'], photo_id, derivatives)
# 7. 清理临时文件
os.remove(local_path)
print(f"[{self.worker_id}] 处理完成: {photo_id}, 生成 {len(derivatives)} 个衍生图")
def _generate_derivatives(self, img: Image.Image, is_lossless: bool) -> dict:
"""并行生成所有衍生图"""
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {}
for spec in DERIVATIVE_SPECS:
for fmt in spec.formats:
future = executor.submit(
self._generate_one_derivative,
img, spec, fmt, is_lossless
)
futures[future] = (spec.size_name, fmt)
for future in concurrent.futures.as_completed(futures):
size_name, fmt = futures[future]
try:
result = future.result()
object_key = self._build_derivative_key(
img.info.get('object_key', ''),
size_name, fmt
)
results[f"{size_name}_{fmt}"] = {
'object_key': object_key,
'data': result['data'],
'size_kb': result['size_kb'],
'width': result['width'],
'height': result['height'],
}
except Exception as e:
print(f"生成失败 ({size_name}_{fmt}): {e}")
return results
def _generate_one_derivative(self, img, spec, fmt, is_lossless):
"""生成单个衍生图"""
# 缩放
resized = img.copy()
resized.thumbnail((spec.max_width, spec.max_height), Image.LANCZOS)
buffer = io.BytesIO()
if is_lossless and fmt == 'webp':
# 无损压缩
resized.save(buffer, 'WebP', lossless=True, method=6)
elif fmt == 'webp':
# 有损压缩
resized.save(buffer, 'WebP', quality=80, method=6)
elif fmt == 'jpeg':
resized = resized.convert('RGB') # JPEG 不支持 alpha
resized.save(buffer, 'JPEG', quality=85)
data = buffer.getvalue()
return {
'data': data,
'size_kb': round(len(data) / 1024, 1),
'width': resized.width,
'height': resized.height,
}
def _build_derivative_key(self, original_key, size_name, fmt):
"""构建衍生图的 OSS 路径"""
# originals/2024/06/a3/f8c2e1b45678.jpg
# → thumbs/medium_webp/2024/06/a3/f8c2e1b45678.webp
parts = original_key.split('/')
filename = parts[-1].rsplit('.', 1)[0]
path = '/'.join(parts[1:-1])
return f'thumbs/{size_name}_{fmt}/{path}/{filename}.{fmt}'
def _notify_completion(self, user_id, photo_id, derivatives):
"""通知前端处理完成"""
# 通过 Redis pub/sub 通知 WebSocket 服务
redis_client.publish(
f'photo:ready:{user_id}',
json.dumps({
'photo_id': photo_id,
'status': 'ready',
'urls': {
k: f'https://cdn.guangying.com/{v["object_key"]}'
for k, v in derivatives.items()
},
})
)
def _report_failure(self, task, error):
"""报告处理失败"""
db.update('photos',
{'status': 'failed', 'error': error},
{'id': task['photo_id']}
)
# 启动 Worker
if __name__ == '__main__':
import sys
worker_id = sys.argv[1] if len(sys.argv) > 1 else 'worker-1'
worker = ImageProcessWorker(worker_id)
worker.run()前端进度通知
用户上传后立即返回”处理中”,前端通过轮询或 WebSocket 获取处理进度:
// 前端:图片处理状态跟踪
class ImageProcessTracker {
private pollInterval: number = 2000; // 2秒轮询一次
async waitForReady(photoId: string): Promise<PhotoResult> {
return new Promise((resolve, reject) => {
const poll = async () => {
try {
const result = await fetch(`/api/photos/${photoId}/status`).then(r => r.json());
if (result.status === 'ready') {
resolve(result);
} else if (result.status === 'failed') {
reject(new Error(`处理失败: ${result.error}`));
} else {
// processing,继续轮询
setTimeout(poll, this.pollInterval);
}
} catch (e) {
reject(e);
}
};
poll();
});
}
}
// 使用
const uploader = new OSSDirectUploader();
const tracker = new ImageProcessTracker();
async function uploadAndShow(file: File) {
// 1. 上传原图到 OSS(2~3秒)
const photoId = await uploader.upload(file, (pct) => {
updateUploadProgress(pct);
});
// 2. 立即显示"处理中"占位
showProcessingPlaceholder(photoId);
// 3. 等待处理完成(通常 5~8秒)
try {
const result = await tracker.waitForReady(photoId);
showPhoto(result.urls); // 显示处理好的图片
} catch (e) {
showError('图片处理失败,请重试');
}
}后端轮询接口:
@app.route('/api/photos/<photo_id>/status')
def photo_status(photo_id):
"""查询图片处理状态"""
photo = db.query('photos', {'id': photo_id})
if not photo:
return jsonify({'error': 'not found'}), 404
response = {
'photo_id': photo_id,
'status': photo['status'], # processing / ready / failed
}
if photo['status'] == 'ready':
derivatives = json.loads(photo['derivatives'])
response['urls'] = {
k: f'https://cdn.guangying.com/{v["object_key"]}'
for k, v in derivatives.items()
}
return jsonify(response)处理进度优化:分阶段通知
一张图要生成 6~10 个衍生图,每完成一个都可以通知前端:
class ProgressNotifier:
"""分阶段进度通知"""
STAGES = [
('download', 10, '下载原图'),
('analyze', 15, '分析内容'),
('thumb_webp', 25, '缩略图'),
('small_webp', 35, '小图'),
('medium_webp', 50, '中图'),
('medium_jpeg', 60, '中图 JPEG'),
('large_webp', 75, '大图'),
('large_jpeg', 85, '大图 JPEG'),
('xlarge_webp', 95, '超大图'),
('upload', 100, '上传完成'),
]
def notify_stage(self, photo_id, stage_name):
"""通知某个阶段完成"""
progress = next(p for name, p, _ in self.STAGES if name == stage_name)
_, _, desc = next(s for s in self.STAGES if s[0] == stage_name)
redis_client.publish(f'photo:progress:{photo_id}', json.dumps({
'stage': stage_name,
'progress': progress,
'description': desc,
}))前端展示更细粒度的进度:
// 前端:展示细粒度处理进度
function showProcessingProgress(photoId: string) {
const ws = new WebSocket(`wss://api.guangying.com/ws/photo/${photoId}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'progress') {
updateProgressBar(data.progress);
updateStatusText(data.description);
} else if (data.type === 'ready') {
showPhoto(data.urls);
ws.close();
}
};
}错误处理和重试
异步处理带来了新的挑战——Worker 可能崩溃、任务可能失败。需要健壮的重试机制:
class RetryableWorker(ImageProcessWorker):
"""带重试机制的 Worker"""
MAX_RETRIES = 3
RETRY_DELAYS = [30, 120, 600] # 30秒、2分钟、10分钟
def _process_one(self, task: dict):
retry_count = task.get('retry_count', 0)
try:
super()._process_one(task)
except Exception as e:
retry_count += 1
if retry_count <= self.MAX_RETRIES:
# 重试:延迟后重新入队
delay = self.RETRY_DELAYS[min(retry_count - 1, len(self.RETRY_DELAYS) - 1)]
print(f"处理失败,{delay}秒后第 {retry_count} 次重试: {task['photo_id']}")
task['retry_count'] = retry_count
task['last_error'] = str(e)
# 延迟重新入队(用 Redis 的延迟队列实现)
redis_client.zadd(
'image:process:delayed',
{json.dumps(task): time.time() + delay}
)
else:
# 超过重试次数,标记为永久失败
print(f"处理彻底失败: {task['photo_id']}, 错误: {e}")
db.update('photos',
{'status': 'failed', 'error': f'重试 {retry_count} 次后失败: {e}'},
{'id': task['photo_id']}
)
self._notify_user_failure(task)
# 延迟队列消费者(独立的线程/进程)
def delayed_queue_consumer():
"""扫描延迟队列,到期的任务重新放入主队列"""
queue = ImageProcessQueue()
while True:
now = time.time()
# 取出到期的任务
tasks = redis_client.zrangebyscore('image:process:delayed', 0, now)
for task_data in tasks:
task = json.loads(task_data)
queue.publish(task)
redis_client.zrem('image:process:delayed', task_data)
time.sleep(1)处理性能优化
一张图生成 6~10 个衍生图,如何更快?
class OptimizedWorker(ImageProcessWorker):
"""性能优化版 Worker"""
def _generate_derivatives(self, img, is_lossless):
"""优化:先缩放到最大尺寸,再依次缩小(避免重复解码)"""
results = {}
# 按尺寸从大到小排序
sorted_specs = sorted(DERIVATIVE_SPECS, key=lambda s: s.max_width, reverse=True)
current_img = img
for spec in sorted_specs:
# 从上一级缩放(而不是每次都从原图缩放)
resized = current_img.copy()
resized.thumbnail((spec.max_width, spec.max_height), Image.LANCZOS)
for fmt in spec.formats:
buffer = io.BytesIO()
if is_lossless and fmt == 'webp':
resized.save(buffer, 'WebP', lossless=True, method=4) # method=4 更快
elif fmt == 'webp':
resized.save(buffer, 'WebP', quality=80, method=4)
elif fmt == 'jpeg':
rgb = resized.convert('RGB')
rgb.save(buffer, 'JPEG', quality=85, optimize=True)
object_key = self._build_derivative_key(
img.info.get('object_key', ''),
spec.size_name, fmt
)
results[f"{spec.size_name}_{fmt}"] = {
'object_key': object_key,
'data': buffer.getvalue(),
'size_kb': round(len(buffer.getvalue()) / 1024, 1),
'width': resized.width,
'height': resized.height,
}
current_img = resized # 下一个尺寸从当前缩放
return results
# 性能对比
"""
优化前(每个尺寸从原图缩放):
thumb (200px) ← 从 6000px 缩放, 1.2s
small (400px) ← 从 6000px 缩放, 1.0s
medium (800px) ← 从 6000px 缩放, 0.8s
large (1200px) ← 从 6000px 缩放, 0.6s
总计: ~4.5s
优化后(从上一级缩放):
xlarge (1920px) ← 从 6000px 缩放, 0.5s
large (1200px) ← 从 1920px 缩放, 0.1s
medium (800px) ← 从 1200px 缩放, 0.08s
small (400px) ← 从 800px 缩放, 0.05s
thumb (200px) ← 从 400px 缩放, 0.03s
总计: ~0.8s
提速 5.6 倍!
"""本节小结
✅ 我学到了什么:
- 图片处理必须异步化——用户不应该等待压缩和格式转换
- 消息队列解耦了上传和处理,上传接口从 12 秒降到 0.3 秒
- Worker 并行生成衍生图,从大到小逐级缩放避免重复解码
- 分阶段进度通知让用户知道图片在处理中
⚠️ 踩过的坑:
- Redis Stream 消费者组需要正确处理 ack,否则消息会被重复消费
- PIL/Pillow 的
thumbnail()是原地操作,必须先copy() - 从原图逐个缩放比从上一级缩放慢 5 倍以上
🎯 下一步:图片处理好了,但还有一件事没做——审核。用户上传了什么内容?有没有违规图片?
我的思考
思考 1
如果消息队列里的任务突然积压了 10 万条(比如运营搞了一个上传活动),怎么快速消化积压?
消息积压的本质是消费速度 < 生产速度。解决思路有两条:
1. 快速扩容消费者:
# 动态扩容 Worker(基于积压量自动扩容)
class AutoScalingWorkerPool:
def __init__(self, min_workers=2, max_workers=20):
self.min_workers = min_workers
self.max_workers = max_workers
self.active_workers = {}
def check_and_scale(self):
"""定期检查积压量并扩缩容"""
pending = self.queue.pending_count('image-processors')
if pending > 1000:
# 积压超过 1000,扩容到最大
target = self.max_workers
elif pending > 100:
# 积压 100~1000,按需扩容
target = min(pending // 50 + self.min_workers, self.max_workers)
else:
# 积压少,缩回最小
target = self.min_workers
self._adjust_workers(target)2. 降低单任务耗时:
# 积压时的降级策略
class BackpressureStrategy:
def apply(self, task):
pending = queue.pending_count()
if pending > 50000:
# 极端积压:只生成缩略图,跳过大尺寸
return {
'specs': [thumb_spec], # 只生成缩略图
'quality': 60, # 降低质量
'method': 0, # 最快的压缩方法
}
elif pending > 10000:
# 严重积压:跳过 xlarge
return {
'specs': [thumb, small, medium, large],
'quality': 75,
'method': 4,
}
else:
# 正常
return {
'specs': ALL_SPECS,
'quality': 80,
'method': 6,
}核心原则:积压时宁可降低质量也要保证速度,等积压消化后再用后台任务重新生成高质量版本。
思考 2
为什么从上一级尺寸缩放比从原图缩放快这么多?有没有精度损失?
为什么快:
从 6000px 缩放到 200px:
- 输入像素数:6000 × 4000 = 24,000,000
- Lanczos 算法对每个输出像素需要采样周围 6×6 = 36 个输入像素
- 总计算量:200 × 133 × 36 ≈ 960,000 次采样
从 400px 缩放到 200px:
- 输入像素数:400 × 267 = 106,800
- 总计算量:200 × 133 × 36 ≈ 960,000 次采样
等等,计算量不是差不多吗?实际上关键不在于采样次数,而在于:
- 内存访问:大图像占更多内存,cache miss 更多
- PIL 内部处理:解码 6000px 图像的开销远大于 400px
- I/O 等待:大图像在内存中读写更慢
精度损失:
确实有,但非常小。因为 Lanczos 缩放本身就是有损的——两次小幅度缩放 vs 一次大幅度缩放,差异通常在 SSIM 0.99+ 以上,人眼完全无法分辨。
一个例外:如果原图有很细的线条或 moiré 纹理,多级缩放可能产生累积的混叠效应。对于这种情况,可以从原图直接缩放。
