数据生命周期管理
凌晨两点的告警短信
那是”光影”上线第 3 个月的一个凌晨,手机疯狂震动:
[阿里云告警] OSS 存储容量超过阈值!
当前容量:1.89 TB / 告警阈值:1.50 TB
增长率:+12 GB/天
预计 30 天后达到 2.25 TB
请及时处理!12 GB/天?我仔细查了一下,发现存储增长的主要来源并不是用户上传的新图片——而是垃圾文件:
def analyze_storage_growth():
"""分析存储增长的来源"""
return {
'daily_new_images': {
'count': 320, # 日均新增 320 张
'avg_size_mb': 4.8,
'daily_growth_gb': 320 * 4.8 / 1024, # ≈ 1.5 GB/天
},
'daily_thumbnails': {
'count': 320 * 5, # 每张图 5 个缩略图
'avg_size_kb': 45,
'daily_growth_gb': 320 * 5 * 45 / 1024 / 1024, # ≈ 0.07 GB/天
},
'temp_files': {
'description': '上传失败、EXIF 中间文件、裁剪临时文件',
'daily_growth_gb': 3.5, # 每天堆积 3.5 GB!
'reason': '从未清理过临时文件',
},
'abandoned_uploads': {
'description': '用户选了文件但没完成上传的分片',
'daily_growth_gb': 2.0,
'reason': '分片上传取消后未清理',
},
'old_thumbnails': {
'description': '历史调整缩略图规格后,旧规格的缩略图未删除',
'daily_growth_gb': 5.0,
'reason': '规格变更后未做数据迁移',
},
'total_daily_growth': 12.07, # GB/天
'real_growth': 1.57, # GB/天(新图片 + 缩略图)
'waste_ratio': '87%', # 87% 的增长是垃圾!
}87% 的存储增长是垃圾文件。 这不是存储的问题,是管理的问题。
我需要一套完整的生命周期管理机制。
生命周期管理的全景图
一个图片从诞生到消亡,应该经历这样的生命周期:
┌──────────────┐
│ 用户上传 │
└──────┬───────┘
│
┌──────▼───────┐
│ 临时存储 │ ← 分片上传的临时空间
│ (temp/) │
└──────┬───────┘
│ 上传完成
┌──────▼───────┐
│ 内容审核 │ ← 鉴黄、OCR、版权
└──────┬───────┘
│ 审核通过
┌──────────▼──────────┐
│ 标准存储(热) │ ← 新图片,频繁访问
│ originals/ │
│ thumbs/ │
└──────────┬──────────┘
│ 30 天无人访问
┌──────────▼──────────┐
│ 低频存储(温) │ ← 偶尔有人翻看
│ originals/ │
└──────────┬──────────┘
│ 90 天无人访问
┌──────────▼──────────┐
│ 归档存储(冷) │ ← 几乎没人看
│ originals/ │
└──────────┬──────────┘
│ 2 年以上无人访问
┌──────────▼──────────┐
│ 删除 │ ← 释放存储空间
└────────────────────┘
临时文件的生命周期(独立路径):
┌──────────┐ 7 天
│ temp/ │ ──────────→ 删除
└──────────┘
缩略图的生命周期:
┌──────────┐ 随原图删除
│ thumbs/ │ ──────────→ 一起删除
└──────────┘生命周期规则配置
OSS 原生生命周期规则
阿里云 OSS 提供了内置的生命周期管理功能,可以在控制台或通过 SDK 配置:
from aliyunsdkoss import oss_api
import json
class OSSLifecycleManager:
"""OSS 生命周期规则管理"""
def __init__(self, oss_client, bucket_name='guangying-images'):
self.oss = oss_client
self.bucket = bucket_name
def setup_lifecycle_rules(self):
"""配置完整的生命周期规则"""
rules = [
# ========== 规则 1:原图的生命周期 ==========
{
'id': 'original-tier-downgrade',
'status': 'Enabled',
'prefix': 'originals/',
'transitions': [
{
# 30 天后转为低频存储
'days': 30,
'storage_class': 'IA',
},
{
# 90 天后转为归档存储
'days': 90,
'storage_class': 'Archive',
},
],
'expiration': {
# 730 天(2 年)后删除
'days': 730,
},
# 归档数据删除前发送通知
'abort_incomplete_multipart_upload': {
'days': 7, # 分片上传 7 天未完成则自动清理
},
},
# ========== 规则 2:临时文件快速清理 ==========
{
'id': 'temp-cleanup',
'status': 'Enabled',
'prefix': 'temp/',
'expiration': {
'days': 7, # 临时文件 7 天后删除
},
},
# ========== 规则 3:上传中的分片清理 ==========
{
'id': 'multipart-cleanup',
'status': 'Enabled',
'prefix': 'uploads/',
'abort_incomplete_multipart_upload': {
'days': 3, # 分片上传 3 天未完成则清理
},
},
# ========== 规则 4:缩略图不降级,随原图删除 ==========
# 缩略图不做生命周期管理
# 原图删除时,通过应用层逻辑同步删除缩略图
]
# 应用规则
lifecycle_config = {
'Rule': rules,
}
self.oss.put_bucket_lifecycle(
bucket=self.bucket,
lifecycle_config=lifecycle_config,
)
return lifecycle_config
def verify_lifecycle_rules(self):
"""验证生命周期规则是否生效"""
config = self.oss.get_bucket_lifecycle(self.bucket)
print("当前生命周期规则:")
for rule in config['Rule']:
print(f"\n规则 ID: {rule['ID']}")
print(f" 状态: {rule['Status']}")
print(f" 前缀: {rule.get('Prefix', '(全部)')}")
if 'Transition' in rule:
for t in rule['Transition']:
print(f" 转换: {t['Days']} 天后 → {t['StorageClass']}")
if 'Expiration' in rule:
print(f" 过期: {rule['Expiration']['Days']} 天后删除")
return config应用层生命周期管理
OSS 原生规则只管存储层,但有些逻辑需要在应用层处理:
from datetime import datetime, timedelta
from typing import List, Dict
import logging
logger = logging.getLogger(__name__)
class ApplicationLifecycleManager:
"""应用层生命周期管理"""
def __init__(self, db, oss_client, cdn_client):
self.db = db
self.oss = oss_client
self.cdn = cdn_client
# ========== 1. 临时文件清理 ==========
def cleanup_temp_files(self) -> Dict:
"""清理 7 天以上的临时文件"""
cutoff = datetime.now() - timedelta(days=7)
# 查找过期临时文件
rows = self.db.query(
"SELECT object_key FROM temp_uploads WHERE created_at < %s AND deleted = FALSE",
(cutoff,),
)
deleted_count = 0
freed_bytes = 0
for (object_key,) in rows:
try:
# 获取文件大小(用于统计)
meta = self.oss.head_object(
bucket='guangying-images', key=object_key
)
freed_bytes += meta['ContentLength']
# 删除 OSS 上的文件
self.oss.delete_object(
bucket='guangying-images', key=object_key
)
# 标记为已删除
self.db.execute(
"UPDATE temp_uploads SET deleted = TRUE WHERE object_key = %s",
(object_key,),
)
deleted_count += 1
except Exception as e:
logger.warning(f"清理临时文件失败 {object_key}: {e}")
freed_gb = freed_bytes / 1024 / 1024 / 1024
logger.info(f"临时文件清理完成:删除 {deleted_count} 个文件,释放 {freed_gb:.2f} GB")
return {
'deleted_count': deleted_count,
'freed_gb': round(freed_gb, 2),
}
# ========== 2. 废弃缩略图清理 ==========
def cleanup_obsolete_thumbnails(self) -> Dict:
"""清理旧规格的缩略图"""
# 当前有效的缩略图规格
ACTIVE_SPECS = ['large', 'medium', 'small', 'webp', 'avif']
# 查找所有缩略图前缀
result = self.oss.list_objects(
bucket='guangying-images', prefix='thumbs/'
)
obsolete_keys = []
for obj in result:
# 解析目录结构:thumbs/{spec}/2024/06/...
parts = obj['Key'].split('/')
if len(parts) >= 2:
spec = parts[1]
if spec not in ACTIVE_SPECS:
obsolete_keys.append(obj['Key'])
# 批量删除
if obsolete_keys:
self.oss.delete_objects(
bucket='guangying-images',
keys=obsolete_keys,
)
logger.info(f"废弃缩略图清理:删除 {len(obsolete_keys)} 个文件")
return {'deleted': len(obsolete_keys)}
# ========== 3. 原图删除时的级联清理 ==========
def delete_image_cascade(self, image_id: str, reason: str = 'user_request') -> Dict:
"""
删除图片及其所有关联数据
- 原图
- 所有缩略图
- CDN 缓存
- 数据库记录
"""
image = self.db.query(
"SELECT * FROM images WHERE id = %s", (image_id,)
)[0]
deleted_resources = []
# 1. 删除缩略图
for spec in ['large', 'medium', 'small', 'webp', 'avif']:
thumb_key = f"thumbs/{spec}/{image['object_key'].replace('originals/', '')}"
try:
self.oss.delete_object(
bucket='guangying-images', key=thumb_key
)
deleted_resources.append(f"缩略图: {thumb_key}")
except Exception:
pass # 缩略图可能不存在
# 2. 删除原图
self.oss.delete_object(
bucket='guangying-images', key=image['object_key']
)
deleted_resources.append(f"原图: {image['object_key']}")
# 3. 刷新 CDN 缓存
self.cdn.refresh_object_caches(
object_path=f"https://cdn.guangying.com/{image['object_key']}",
object_type='Directory',
)
deleted_resources.append("CDN 缓存已刷新")
# 4. 软删除数据库记录(保留元数据用于审计)
self.db.execute(
"""UPDATE images
SET deleted_at = NOW(), delete_reason = %s
WHERE id = %s""",
(reason, image_id),
)
logger.info(
f"图片 {image_id} 级联删除完成:{len(deleted_resources)} 个资源"
)
return {
'image_id': image_id,
'reason': reason,
'deleted_resources': deleted_resources,
}
# ========== 4. 到期图片的预警 ==========
def check_expiring_images(self, days_before=30) -> List[Dict]:
"""检查即将到期删除的图片(2 年限期前 30 天预警)"""
warning_date = datetime.now() + timedelta(days=days_before)
expire_date = datetime.now() - timedelta(days=730 - days_before)
images = self.db.query(
"""SELECT id, object_key, user_id, created_at
FROM images
WHERE created_at <= %s
AND deleted_at IS NULL
AND storage_tier = 'archive'
ORDER BY created_at ASC""",
(expire_date,),
)
notifications = []
for img in images:
days_left = (img['created_at'] + timedelta(days=730) - datetime.now()).days
notifications.append({
'image_id': img['id'],
'user_id': img['user_id'],
'object_key': img['object_key'],
'days_until_deletion': days_left,
'action': 'send_warning_email',
})
# 发送预警邮件
self._send_expiry_warning(img, days_left)
return notifications
def _send_expiry_warning(self, image: Dict, days_left: int):
"""发送到期预警邮件"""
# 如果用户想保留,可以"收藏"图片,收藏的图片不自动删除
pass # 邮件发送逻辑
# 定时任务:每天执行生命周期管理
from celery.schedules import crontab
lifecycle_schedule = {
# 每天凌晨 2 点清理临时文件
'cleanup-temp-files': {
'task': 'tasks.cleanup_temp_files',
'schedule': crontab(hour=2, minute=0),
},
# 每周日凌晨 3 点清理废弃缩略图
'cleanup-obsolete-thumbs': {
'task': 'tasks.cleanup_obsolete_thumbnails',
'schedule': crontab(day_of_week='sun', hour=3, minute=0),
},
# 每月 1 号检查到期图片
'check-expiring-images': {
'task': 'tasks.check_expiring_images',
'schedule': crontab(day_of_month=1, hour=10, minute=0),
},
}TypeScript 版本:面向前端的过期文件检测
前端也需要感知图片的生命周期状态,特别是对于过期/归档的图片:
interface ImageLifecycleState {
imageId: string;
status: 'active' | 'infrequent' | 'archived' | 'expired';
storageTier: 'standard' | 'IA' | 'Archive';
createdAt: Date;
lastAccessedAt: Date;
expiresAt: Date | null; // 预计删除日期
daysUntilExpiry: number | null; // 距删除还剩几天
canRestore: boolean; // 是否可以恢复
}
class ImageLifecycleService {
private apiBase = '/api/v1/images';
/**
* 获取图片的生命周期状态
*/
async getLifecycleState(imageId: string): Promise<ImageLifecycleState> {
const res = await fetch(`${this.apiBase}/${imageId}/lifecycle`);
return res.json();
}
/**
* 批量检查图片是否即将过期
* 用于在用户相册页面显示预警
*/
async batchCheckExpiry(
imageIds: string[]
): Promise<Map<string, ImageLifecycleState>> {
const res = await fetch(`${this.apiBase}/lifecycle/batch`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ image_ids: imageIds }),
});
const data = await res.json();
const map = new Map<string, ImageLifecycleState>();
for (const state of data.states) {
map.set(state.imageId, state);
}
return map;
}
/**
* 用户主动续期(收藏的图片不会被自动删除)
*/
async renewImage(imageId: string): Promise<void> {
await fetch(`${this.apiBase}/${imageId}/renew`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
reason: 'user_favorited',
extend_days: 365, // 续期 1 年
}),
});
}
}
// React 组件:图片过期预警
function ExpiryWarningBadge({ imageId }: { imageId: string }) {
const [state, setState] = useState<ImageLifecycleState | null>(null);
useEffect(() => {
const service = new ImageLifecycleService();
service.getLifecycleState(imageId).then(setState);
}, [imageId]);
if (!state || state.daysUntilExpiry === null) return null;
// 30 天内即将过期
if (state.daysUntilExpiry <= 30 && state.daysUntilExpiry > 0) {
return (
<div className="expiry-warning">
<span className="warning-icon">⚠️</span>
<span>此图片将于 {state.daysUntilExpiry} 天后自动删除</span>
<button onClick={() => new ImageLifecycleService().renewImage(imageId)}>
续期保留
</button>
</div>
);
}
// 已过期但还在归档中
if (state.status === 'archived') {
return (
<div className="archived-badge">
<span>📦</span>
<span>已归档(加载可能需要 1~5 分钟)</span>
</div>
);
}
return null;
}存储空间监控和告警
生命周期管理的最后一环是监控。你无法管理你看不见的东西:
import json
from datetime import datetime, timedelta
from typing import Dict, List
class StorageMonitor:
"""存储空间监控与告警"""
ALERT_THRESHOLDS = {
'total_storage_gb': 3000, # 总容量告警阈值
'daily_growth_gb': 5, # 日增长告警阈值
'temp_file_ratio': 0.10, # 临时文件占比告警
'waste_ratio': 0.05, # 垃圾文件占比告警
}
def __init__(self, db, oss_client):
self.db = db
self.oss = oss_client
def get_storage_dashboard(self) -> Dict:
"""生成存储监控面板数据"""
# 获取 OSS 存储统计
bucket_stats = self.oss.get_bucket_stat('guangying-images')
total_bytes = bucket_stats['Storage']
object_count = bucket_stats['ObjectCount']
# 按目录和存储类型统计
tier_stats = self._get_tier_breakdown()
prefix_stats = self._get_prefix_breakdown()
# 计算日增长率
yesterday_total = self._get_yesterday_storage()
daily_growth = (total_bytes - yesterday_total) / 1024 / 1024 / 1024
return {
'timestamp': datetime.now().isoformat(),
'total_storage_gb': round(total_bytes / 1024 / 1024 / 1024, 2),
'object_count': object_count,
'daily_growth_gb': round(daily_growth, 2),
'tier_breakdown': tier_stats,
'prefix_breakdown': prefix_stats,
'estimated_cost': self._estimate_monthly_cost(tier_stats),
'alerts': self._check_alerts(
total_bytes / 1024 / 1024 / 1024, daily_growth, prefix_stats
),
}
def _get_tier_breakdown(self) -> Dict:
"""按存储类型统计"""
rows = self.db.query("""
SELECT storage_tier,
COUNT(*) as count,
SUM(file_size_bytes) as total_bytes
FROM images
WHERE deleted_at IS NULL
GROUP BY storage_tier
""")
tiers = {}
for tier, count, total_bytes in rows:
tiers[tier] = {
'count': count,
'size_gb': round(total_bytes / 1024 / 1024 / 1024, 2),
'pct': 0, # 后面计算
}
total = sum(t['size_gb'] for t in tiers.values())
for t in tiers.values():
t['pct'] = round(t['size_gb'] / total * 100, 1) if total > 0 else 0
return tiers
def _get_prefix_breakdown(self) -> Dict:
"""按目录前缀统计"""
prefixes = {
'originals/': {'description': '原图', 'size_gb': 0},
'thumbs/': {'description': '缩略图', 'size_gb': 0},
'temp/': {'description': '临时文件', 'size_gb': 0},
'uploads/': {'description': '上传分片', 'size_gb': 0},
}
for prefix in prefixes:
result = self.oss.list_objects(
bucket='guangying-images', prefix=prefix, max_keys=1000
)
total = sum(obj['Size'] for obj in result)
prefixes[prefix]['size_gb'] = round(
total / 1024 / 1024 / 1024, 2
)
return prefixes
def _estimate_monthly_cost(self, tier_stats: Dict) -> Dict:
"""估算月度存储费用"""
prices = {
'standard': 0.12,
'infrequent': 0.08,
'archive': 0.03,
}
cost_by_tier = {}
total = 0
for tier, stats in tier_stats.items():
price = prices.get(tier, 0.12)
cost = stats['size_gb'] * price
cost_by_tier[tier] = round(cost, 2)
total += cost
return {
'breakdown': cost_by_tier,
'total': round(total, 2),
}
def _check_alerts(
self,
total_gb: float,
daily_growth: float,
prefix_stats: Dict,
) -> List[Dict]:
"""检查是否触发告警"""
alerts = []
if total_gb > self.ALERT_THRESHOLDS['total_storage_gb']:
alerts.append({
'level': 'warning',
'message': f'总存储量 {total_gb:.0f} GB 超过阈值 '
f'{self.ALERT_THRESHOLDS["total_storage_gb"]} GB',
'action': '检查是否有垃圾文件需要清理',
})
if daily_growth > self.ALERT_THRESHOLDS['daily_growth_gb']:
alerts.append({
'level': 'warning',
'message': f'日增长 {daily_growth:.1f} GB 超过阈值 '
f'{self.ALERT_THRESHOLDS["daily_growth_gb"]} GB',
'action': '检查临时文件是否堆积',
})
temp_ratio = (
prefix_stats.get('temp/', {}).get('size_gb', 0)
/ total_gb if total_gb > 0 else 0
)
if temp_ratio > self.ALERT_THRESHOLDS['temp_file_ratio']:
alerts.append({
'level': 'critical',
'message': f'临时文件占比 {temp_ratio * 100:.1f}% 超过阈值 '
f'{self.ALERT_THRESHOLDS["temp_file_ratio"] * 100}%',
'action': '立即清理临时文件',
})
return alerts
# 每小时执行一次监控检查
def run_monitoring():
monitor = StorageMonitor(db=get_db(), oss_client=get_oss())
dashboard = monitor.get_storage_dashboard()
# 输出监控面板
print(f"=== 存储监控面板 ===")
print(f"时间: {dashboard['timestamp']}")
print(f"总存储: {dashboard['total_storage_gb']} GB")
print(f"对象数: {dashboard['object_count']}")
print(f"日增长: {dashboard['daily_growth_gb']} GB")
print(f"月度预估费用: {dashboard['estimated_cost']['total']} 元")
if dashboard['alerts']:
print(f"\n⚠️ 告警 ({len(dashboard['alerts'])} 条):")
for alert in dashboard['alerts']:
print(f" [{alert['level']}] {alert['message']}")
print(f" → {alert['action']}")实施效果
生命周期管理上线后,我把配置写入代码仓库,用 Terraform 管理:
# 实施一个月后的效果
IMPLEMENTATION_RESULT = {
'before': {
'total_storage_tb': 2.1,
'monthly_storage_cost': 30.48, # 全部标准存储
'daily_growth_gb': 12.0,
},
'after': {
'total_storage_tb': 1.4, # 清理垃圾后减少 33%
'monthly_storage_cost': 16.63, # 分层 + 清理
'daily_growth_gb': 1.8, # 垃圾不再堆积
},
'actions_taken': [
'配置 OSS 生命周期规则(30天低频,90天归档,730天删除)',
'临时文件 7 天自动清理',
'废弃缩略图批量清理(释放 350 GB)',
'分片上传 3 天自动清理',
'存储监控告警上线',
],
'monthly_saving': 13.85, # 元
'biggest_win': '垃圾清理(释放 700 GB)',
}本节小结
✅ 我学到了什么:
- 图片系统 87% 的存储增长可能来自垃圾文件,而非真正的数据增长
- OSS 原生生命周期规则可以处理简单的按时间降级和删除
- 复杂逻辑(级联删除、过期预警、用户续期)需要在应用层实现
- 监控是前提——没有存储监控,就无法发现问题
⚠️ 踩过的坑:
- 一开始配了 30 天删除原图,结果摄影师的作品被误删——需要加”收藏保护”逻辑
- OSS 生命周期规则删除是不可逆的,删除前必须有预警机制
- 临时文件清理不能只看 OSS,还要清理数据库中的记录
🎯 下一步:分层和生命周期都做好了,现在来算一笔完整的账——从 5000 元到 800 元,成本优化实战。
我的思考
思考 1
OSS 生命周期规则的”30 天后转低频存储”是基于什么时间计算的?是文件创建时间还是最后修改时间?
参考答案
OSS 生命周期规则的时间基准是最后修改时间(Last Modified Time),不是创建时间。
这意味着:
- 文件上传后 30 天 → 转低频
- 如果文件被覆盖(重新上传),Last Modified Time 重置
- 如果文件只是被读取(下载),Last Modified Time 不变
特别注意:
- OSS 不支持"基于访问时间"的自动分层
- 即使一张图片每天都在被访问,30 天后仍然会被降级
解决方案:
1. 应用层自己维护"访问时间",必要时覆盖文件以重置时间
2. 使用上一节的 StorageTierManager,基于访问日志做智能分层
3. OSS 生命周期规则只作为兜底策略(纯按时间)这就是为什么我们在上一节写了自定义的 StorageTierManager——OSS 原生规则不够智能,只能按时间,不能按访问频率。
思考 2
如果用户删除了一张图片,但 CDN 缓存中还有,用户仍然可以通过 URL 访问到,如何处理这种不一致?
参考答案
这是缓存与源站的一致性经典问题。处理方案:
方案 1:主动刷新 CDN 缓存(推荐)
删除图片时,立即调用 CDN API 刷新对应 URL
- 优点:实时生效
- 缺点:有 API 调用成本
方案 2:URL 版本化
每次图片变更时,改变 URL 中的版本号
- /thumbs/2024/06/abc.webp?v=1
- 图片被删除后,数据库中记录标记为 deleted
- 前端不会生成 v=1 的 URL → 缓存自然失效
- 优点:不需要刷新 CDN
- 缺点:如果有人收藏了旧 URL 仍然能访问
方案 3:API 网关拦截
在 CDN 回源路径上设置 API 网关
每次请求检查数据库:图片是否已删除?
- 已删除 → 返回 404(CDN 会缓存 404)
- 未删除 → 正常回源
- 优点:最安全
- 缺点:增加回源延迟
"光影"的方案:方案 1 + 方案 2 组合
1. 删除图片时主动刷新 CDN
2. 同时在 API 层做软删除检查(兜底)
3. CDN 刷新通常 5 分钟全网生效,API 层检查确保刷新完成前的安全思考 3
临时文件的生命周期为什么设为 7 天而不是更短(比如 24 小时)?
参考答案
7 天是权衡后的选择:
设太短(如 1 小时)的风险:
- 用户上传大文件(如 20MB RAW 照片),分片上传中网络断了
- 1 小时内网络没恢复 → 临时分片被清理 → 用户需要从头重新上传
- 用户体验差,特别是弱网环境
设太长(如 30 天)的风险:
- 垃圾堆积,浪费存储空间
- 占用存储成本
7 天的理由:
- 覆盖绝大多数断点续传场景(用户可能隔天再试)
- 上传分片 3 天清理(分片一般几小时内会上传完成)
- 完整的临时文件 7 天清理(给用户足够的下载/确认时间)
实际上:
- 上传分片(multipart parts)→ 3 天清理
- 临时文件(temp/)→ 7 天清理
- 处理中间文件 → 处理完成后立即清理
按文件类型设置不同的过期时间,是更精细的做法。