敏感内容识别
审核不只是”鉴黄”
鉴黄检测上线后,我以为审核问题解决了。直到有一天,一个用户上传了一张看似正常的风景照——画面上是一条安静的街道,但街道旁的广告牌上写着一句违禁标语。
AI 鉴黄模型判定:安全。
因为鉴黄模型只看图像的视觉内容,完全看不到图片里的文字。
我又翻了翻后台数据,发现了更多被鉴黄模型放过的”漏网之鱼”:
已通过鉴黄但可能违规的图片:
1. 风景照 + 广告牌上的违禁标语 → 文字违规
2. 暴力场景截图(电影/新闻截图) → 暴力血腥
3. 政治人物合影 → 政治敏感
4. 假冒伪劣商品照片 → 广告违规
5. 含有二维码的图片 → 引流/诈骗鉴黄只解决了色情这一个维度。一个完整的内容审核系统,需要多维度、多模态的检测能力。
维度一:OCR 文字识别
图片中的文字是审核的重要维度。很多违规内容不以视觉形式呈现,而是以文字形式”藏”在图片里。
OCR 方案对比
# 方案一:开源 Tesseract
import pytesseract
from PIL import Image
def ocr_tesseract(image_path: str) -> str:
"""使用 Tesseract 做文字识别"""
img = Image.open(image_path)
# 优化识别准确率:预处理
# 1. 转灰度
img = img.convert('L')
# 2. 二值化
threshold = 128
img = img.point(lambda x: 255 if x > threshold else 0)
text = pytesseract.image_to_string(img, lang='chi_sim+eng')
return text.strip()
# 方案二:云 OCR API
class AliyunOCR:
"""阿里云 OCR"""
def __init__(self, access_key, secret_key):
self.access_key = access_key
self.secret_key = secret_key
def recognize(self, image_url: str) -> dict:
"""调用阿里云通用 OCR"""
body = {
'tasks': [{
'dataId': str(uuid.uuid4()),
'url': image_url,
}],
'scenes': ['antispam'], # 文字反垃圾
}
result = self._call_api('/green/image/scan', body)
return {
'text': result.get('results', [{}])[0].get('ocr', ''),
'words': result.get('results', [{}])[0].get('ocrDetail', []),
'risk_words': result.get('results', [{}])[0].get('hit', []),
}对比测试:
OCR 方案对比(100 张含文字的图片)
方案 识别准确率 速度 成本
──────────────────────────────────────────────
Tesseract 72% 800ms 免费
阿里云 OCR 96% 200ms 0.8元/千次
腾讯云 OCR 95% 250ms 0.7元/千次结论:Tesseract 对中文的识别准确率太低(72%),特别是手写体、艺术字体、小字。云 API 是更好的选择。
文字内容审核
识别出文字后,还需要判断文字内容是否违规:
import re
class TextContentAuditor:
"""文字内容审核"""
def __init__(self):
# 敏感词库(从云服务同步)
self.sensitive_words = self._load_sensitive_words()
self.sensitive_patterns = self._compile_patterns()
def audit_text(self, text: str) -> dict:
"""审核文字内容"""
risks = []
# 1. 敏感词匹配
for word in self.sensitive_words:
if word in text:
risks.append({
'type': 'sensitive_word',
'word': word,
'severity': 'high',
})
# 2. 正则匹配(手机号、微信号、URL)
patterns = {
'phone': r'1[3-9]\d{9}',
'wechat': r'[Ww][Cc][Hh]?[Aa][Tt]?\s*[::]\s*\w{5,20}',
'url': r'https?://\S+',
'qq': r'[Qq][Qq]\s*[::]?\s*\d{5,12}',
}
for ptype, pattern in patterns.items():
matches = re.findall(pattern, text)
if matches:
risks.append({
'type': ptype,
'matches': matches,
'severity': 'medium',
})
# 3. 调用 NLP 模型做语义理解
nlp_result = self._nlp_audit(text)
if nlp_result['risk']:
risks.append({
'type': 'semantic',
'label': nlp_result['label'],
'severity': nlp_result['severity'],
})
overall_severity = max(
(r['severity'] for r in risks),
key=lambda s: {'high': 3, 'medium': 2, 'low': 1}.get(s, 0),
default='none'
)
return {
'text': text,
'has_risk': len(risks) > 0,
'risks': risks,
'action': 'reject' if overall_severity == 'high' else
'review' if overall_severity == 'medium' else 'pass',
}
def _nlp_audit(self, text):
"""调用 NLP 服务做语义审核"""
# 使用云服务的文本审核 API
result = text_audit_api.scan(text)
return result
def _load_sensitive_words(self):
"""加载敏感词库"""
# 从数据库或配置文件加载
# 注意:敏感词库需要定期更新
words = set()
for source in ['default', 'politics', 'pornography', 'violence']:
batch = redis_client.smembers(f'sensitive_words:{source}')
words.update(w.decode() for w in batch)
return words
def _compile_patterns(self):
"""编译正则表达式模式"""
return {} # 在 __init__ 中已硬编码维度二:暴力血腥检测
class ViolenceDetector:
"""暴力血腥内容检测"""
def __init__(self):
self.api_client = AliyunGreenClient()
def detect(self, image_url: str) -> dict:
"""检测暴力血腥内容"""
result = self.api_client.scan_image(
url=image_url,
scenes=['terrorism', 'violence'], # 暴恐检测
)
return {
'terrorism': self._parse_scene(result, 'terrorism'),
'violence': self._parse_scene(result, 'violence'),
}
def _parse_scene(self, result, scene):
for r in result.get('results', []):
if r.get('scene') == scene:
return {
'label': r.get('label'), # normal/violence/terror
'confidence': r.get('rate', 0),
'suggestion': r.get('suggestion'), # pass/review/block
}
return {'label': 'unknown', 'confidence': 0, 'suggestion': 'pass'}
# 测试
detector = ViolenceDetector()
test_cases = [
('sports_match.jpg', '正常体育比赛'),
('horror_movie.jpg', '恐怖电影截图'),
('accident_news.jpg', '新闻事故现场'),
('cooking_raw_meat.jpg', '烹饪生肉'),
('surgery_photo.jpg', '手术照片(医学)'),
]
for url, desc in test_cases:
result = detector.detect(url)
print(f"{desc}: {result}")暴力检测的难点:
容易误判的场景:
- 烹饪节目(生肉、鲜血) → 误判为暴力
- 医学手术照片 → 误判为血腥
- 体育比赛(拳击、摔跤) → 误判为暴力
- 万圣节装扮 → 误判为恐怖
- 新闻报道中的事故现场 → 误判为血腥
这些场景的处理策略:
- 医学/新闻类:加标签但不拦截
- 体育比赛:加"体育"标签后降低权重
- 用户标注的"恐怖电影截图":允许但加内容警告维度三:政治敏感检测
这是最敏感也最必须的维度。国内的平台必须具备政治敏感内容的识别能力。
class PoliticalContentDetector:
"""政治敏感内容检测"""
def detect(self, image_url: str) -> dict:
"""检测政治敏感内容"""
result = self.api_client.scan_image(
url=image_url,
scenes=['ad'], # 某些政治内容会以广告形式出现
)
# 同时做 OCR + 文字审核
ocr_result = ocr_service.recognize(image_url)
text_audit = text_auditor.audit_text(ocr_result['text'])
return {
'image_risk': result,
'text_risk': text_audit,
'action': self._decide_action(result, text_audit),
}
def _decide_action(self, image_result, text_result):
"""综合决策"""
# 政治敏感内容:宁可误报不可漏报
if image_result.get('label') == 'politics':
return 'reject'
if text_result.get('has_risk') and text_result['action'] == 'reject':
return 'reject'
if text_result.get('action') == 'review':
return 'review'
return 'pass'维度四:广告和二维码检测
摄影社区里经常有人上传带二维码的图片做引流:
from pyzbar import pyzbar
from PIL import Image
class QRCodeDetector:
"""二维码检测"""
def detect(self, image_path: str) -> dict:
"""检测图片中的二维码"""
img = Image.open(image_path)
# 解码二维码
barcodes = pyzbar.decode(img)
qr_codes = []
for barcode in barcodes:
if barcode.type == 'QRCODE':
qr_codes.append({
'data': barcode.data.decode('utf-8'),
'rect': barcode.rect,
})
has_qr = len(qr_codes) > 0
# 检查二维码内容是否安全
unsafe_qr = False
for qr in qr_codes:
url_audit = self._audit_qr_url(qr['data'])
if url_audit['unsafe']:
unsafe_qr = True
return {
'has_qr_code': has_qr,
'count': len(qr_codes),
'is_unsafe': unsafe_qr,
'codes': qr_codes,
}
def _audit_qr_url(self, url: str) -> dict:
"""审核二维码中的 URL"""
# 检查是否是已知的恶意域名
# 检查是否是竞品引流
# 检查是否是违禁品交易
parsed = urlparse(url)
blocked_domains = ['xxx.com', 'gambling.xxx', ...]
if parsed.hostname in blocked_domains:
return {'unsafe': True, 'reason': 'blocked_domain'}
return {'unsafe': False}
class WatermarkDetector:
"""水印检测——检查是否是其他平台的图"""
SUSPICIOUS_WATERMARKS = [
'小红书', '图虫', '500px', 'Getty Images',
'Shutterstock', '微博', '抖音',
]
def detect(self, image_path: str) -> dict:
"""检测是否含有其他平台的水印"""
# 先 OCR 识别文字
text = ocr_service.recognize(image_path)
found_watermarks = []
for wm in self.SUSPICIOUS_WATERMARKS:
if wm in text:
found_watermarks.append(wm)
return {
'has_watermark': len(found_watermarks) > 0,
'watermarks': found_watermarks,
'action': 'review' if found_watermarks else 'pass',
}多维度审核整合
把所有审核维度整合到一个服务中:
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class AuditResult:
photo_id: str
action: str # pass / review / reject
scores: dict
risks: List[dict]
review_reason: Optional[str] = None
class ComprehensiveAuditor:
"""多维度综合审核"""
def __init__(self):
self.nsfw = NSFWDetector() # 鉴黄
self.ocr = AliyunOCR() # OCR
self.text_audit = TextContentAuditor() # 文字审核
self.violence = ViolenceDetector() # 暴力检测
self.qrcode = QRCodeDetector() # 二维码
self.watermark = WatermarkDetector() # 水印
def audit(self, photo_id: str, image_url: str, local_path: str) -> AuditResult:
"""综合审核:并行调用所有维度"""
import concurrent.futures
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(self.nsfw.detect, image_url): 'nsfw',
executor.submit(self.ocr.recognize, image_url): 'ocr',
executor.submit(self.violence.detect, image_url): 'violence',
executor.submit(self.qrcode.detect, local_path): 'qrcode',
executor.submit(self.watermark.detect, local_path): 'watermark',
}
for future in concurrent.futures.as_completed(futures):
dim = futures[future]
try:
results[dim] = future.result()
except Exception as e:
results[dim] = {'error': str(e)}
# 文字审核(依赖 OCR 结果)
if 'ocr' in results and not results['ocr'].get('error'):
text = results['ocr'].get('text', '')
if text:
results['text'] = self.text_audit.audit_text(text)
# 综合决策
return self._decide(photo_id, results)
def _decide(self, photo_id: str, results: dict) -> AuditResult:
"""综合所有维度的结果做最终决策"""
risks = []
action = 'pass'
review_reasons = []
# 维度 1:鉴黄
nsfw = results.get('nsfw', {})
if nsfw.get('label') == 'porn' and nsfw.get('confidence', 0) > 90:
action = 'reject'
risks.append({'dim': 'nsfw', 'level': 'high', 'detail': nsfw})
elif nsfw.get('label') in ('porn', 'sexy') and nsfw.get('confidence', 0) > 60:
if action != 'reject':
action = 'review'
review_reasons.append('色情风险')
risks.append({'dim': 'nsfw', 'level': 'medium', 'detail': nsfw})
# 维度 2:暴力
violence = results.get('violence', {})
if violence.get('terrorism', {}).get('suggestion') == 'block':
action = 'reject'
risks.append({'dim': 'violence', 'level': 'high', 'detail': violence})
elif violence.get('violence', {}).get('suggestion') == 'review':
if action != 'reject':
action = 'review'
review_reasons.append('暴力风险')
risks.append({'dim': 'violence', 'level': 'medium', 'detail': violence})
# 维度 3:文字
text = results.get('text', {})
if text.get('action') == 'reject':
action = 'reject'
risks.append({'dim': 'text', 'level': 'high', 'detail': text})
elif text.get('action') == 'review':
if action != 'reject':
action = 'review'
review_reasons.append('文字风险')
risks.append({'dim': 'text', 'level': 'medium', 'detail': text})
# 维度 4:二维码
qrcode = results.get('qrcode', {})
if qrcode.get('is_unsafe'):
action = 'reject'
risks.append({'dim': 'qrcode', 'level': 'high', 'detail': qrcode})
elif qrcode.get('has_qr_code'):
if action == 'pass':
action = 'review'
review_reasons.append('含二维码')
risks.append({'dim': 'qrcode', 'level': 'low', 'detail': qrcode})
# 维度 5:水印
watermark = results.get('watermark', {})
if watermark.get('has_watermark'):
if action == 'pass':
action = 'review'
review_reasons.append(f"含平台水印: {', '.join(watermark['watermarks'])}")
risks.append({'dim': 'watermark', 'level': 'low', 'detail': watermark})
return AuditResult(
photo_id=photo_id,
action=action,
scores=results,
risks=risks,
review_reason='; '.join(review_reasons) if review_reasons else None,
)
# 使用
auditor = ComprehensiveAuditor()
# 在 Worker 中调用
result = auditor.audit(
photo_id='12345',
image_url='https://cdn.guangying.com/originals/2024/06/a3/abc.jpg',
local_path='/tmp/process_12345_original',
)
print(f"图片 12345: {result.action}")
# reject: 直接拦截
# review: 转人工审核
# pass: 放行性能优化
多维度审核意味着多次 API 调用,如何控制延迟?
# 审核延迟分析
"""
单次审核的 API 调用:
1. 鉴黄: 200ms
2. OCR: 250ms
3. 暴力检测: 200ms
4. 二维码: 150ms(本地)
5. 水印: 100ms(本地)
串行总计: 900ms
并行总计: 250ms(取最长)
结论:必须并行调用,否则单张图片审核耗时接近 1 秒。
"""进一步的优化——分级审核:
class TieredAuditor:
"""分级审核——先快审,再精审"""
def audit(self, photo_id, image_url, local_path):
# 第一级:快速审核(只做鉴黄 + 二维码,耗时 < 200ms)
quick_result = self._quick_audit(photo_id, image_url, local_path)
if quick_result.action == 'reject':
# 快审直接拒绝,不需要精审
return quick_result
if quick_result.action == 'pass':
# 快审通过,但可能需要精审
# 异步发起精审
queue.publish('full_audit', {
'photo_id': photo_id,
'image_url': image_url,
'local_path': local_path,
})
# 先放行(如果后续精审发现问题会回调)
return quick_result
# review 需要等精审结果
return self._full_audit(photo_id, image_url, local_path)
def _quick_audit(self, photo_id, image_url, local_path):
"""快审:只做鉴黄和二维码"""
nsfw = self.nsfw.detect(image_url)
qrcode = self.qrcode.detect(local_path)
if nsfw.get('label') == 'porn' and nsfw.get('confidence', 0) > 90:
return AuditResult(photo_id, 'reject', {'nsfw': nsfw}, [])
if qrcode.get('is_unsafe'):
return AuditResult(photo_id, 'reject', {'qrcode': qrcode}, [])
return AuditResult(photo_id, 'pass', {}, [])本节小结
✅ 我学到了什么:
- 内容审核不只是鉴黄——OCR 文字、暴力血腥、二维码、水印都需要检测
- 多维度审核需要并行调用,否则延迟不可接受
- 分级审核(快审 + 精审)可以在 200ms 内完成初步判断
- 云 API 在 OCR 和暴力检测上远优于开源方案
⚠️ 踩过的坑:
- Tesseract 对中文的识别准确率只有 72%,手写体更差
- 暴力检测容易误判医学照片和烹饪内容
- OCR 审核需要先识别文字再审核文字,链路较长
🎯 下一步:审核能力都有了,但如何设计一套完整的审核流程?机审和人审怎么配合?
我的思考
思考 1
如果用户在图片中把敏感文字做了”变形处理”(比如”加 V:①③⑤②④⑥⑧⑨⑩”),OCR 和文字审核能识别吗?
参考答案
这种变形文字是内容审核的常见对抗手段。应对策略:
class AntiEvasionTextProcessor:
"""反变形文字处理"""
# 常见变形映射
CHAR_MAP = {
# 全角数字 → 半角
'①': '1', '②': '2', '③': '3', '④': '4', '⑤': '5',
'⑥': '6', '⑦': '7', '⑧': '8', '⑨': '9', '⑩': '0',
# 拼音替换
'微': 'V', '薇': 'V', '威': 'V',
# 同音字
'加V': '加微', '加VX': '加微信',
# 特殊字符拆字
'徴信': '微信', '威芯': '微信',
# 插入干扰符
'V.X': 'VX', 'V-X': 'VX',
}
def normalize(self, text: str) -> str:
"""文字归一化——还原变形"""
# 1. 全角转半角
result = []
for ch in text:
if ch in self.CHAR_MAP:
result.append(self.CHAR_MAP[ch])
else:
result.append(ch)
normalized = ''.join(result)
# 2. 去除干扰字符(空格、点、横线等)
import re
normalized = re.sub(r'[\s.\-·\-_]+', '', normalized)
# 3. 对归一化后的文字再做审核
return normalized
def audit(self, raw_text: str) -> dict:
normalized = self.normalize(raw_text)
return text_auditor.audit_text(normalized)更高级的对抗手段(如把文字做成图片拼接、用 emoji 替代关键字符等),需要结合上下文语义分析和用户行为分析来应对。
思考 2
如果一个认证摄影师上传了含二维码的图片(二维码指向他自己的摄影作品集),应该拦截吗?
参考答案
不应该一刀切拦截。需要根据上下文判断:
判断策略:
1. 二维码内容是什么?
- 个人网站/作品集 → 允许
- 竞品平台/违规网站 → 拦截
- 未知链接 → 人工审核
2. 上传者身份
- 认证摄影师 → 信任度更高
- 新注册用户 → 更严格
3. 图片上下文
- 个人主页头像/封面 → 允许
- 作品详情页 → 允许(标注"含联系方式")
- 评论/回复中的图片 → 更严格def handle_qrcode_case(qr_result, uploader_info, context):
if not qr_result['has_qr_code']:
return 'pass'
# 解析二维码内容
for code in qr_result['codes']:
url = code['data']
if is_personal_portfolio(url) and uploader_info['is_verified']:
return 'pass_with_tag' # 允许,但标记
elif is_known_safe_domain(url):
return 'pass_with_tag'
elif is_blocked_domain(url):
return 'reject'
else:
return 'review' # 未知链接,人工审核原则:对可信用户 + 可信内容给予信任,对未知情况转人工审核。
