用 Python + Claude API 批量给图片打标签写描述
用 Python + Claude API 批量给图片打标签写描述
电商运营手里压着 800 张商品图要补全标签,摄影师攒了 2000 张素材等着分类归档,算法工程师要给训练集批量打注释——这类需求有个共同的痛点:手动做根本来不及,但网上的教程几乎清一色是「单张图片演示」,没人告诉你批量跑起来会踩哪些坑。
本文以电商商品图批量标注为主线案例,从 Prompt 设计到并发控制、从断点续传到成本估算,完整走一遍可以直接落地的批量处理流水线。
一、自由文本描述为什么不够用?先把输出结构化
很多教程到「Claude 返回了一段图片描述」就收尾了。但实际业务里你需要的是能直接写进数据库或导入电商后台的结构化字段,而不是一大段自然语言——没法用、没法筛、没法批量导入。
1.1 System Prompt 模板
在系统提示词里明确约定返回格式,是让 Claude 稳定输出 JSON 最可靠的办法:
SYSTEM_PROMPT = """你是一个电商商品图片标注助手。
分析用户提供的图片,严格按以下 JSON 格式返回,不要输出任何其他内容:
{
"tags": ["标签1", "标签2", "标签3"],
"category": "一级分类",
"description": "不超过50字的商品描述",
"color": "主色调",
"style": "风格关键词"
}
tags 最多 5 个,description 必须是中文。"""
1.2 解析失败的 fallback 处理
Claude 偶尔会在 JSON 前后带上多余的文字,直接 json.loads() 就会抛异常。生产环境里必须准备好 fallback,否则一条数据出问题就能卡死整个流程:
import json, re
def parse_response(text: str) -> dict:
# 先尝试直接解析
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 退而求其次,用正则抠出第一个 JSON 对象
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
# 彻底没救了就返回空结构,至少不影响主流程继续跑
return {"tags": [], "category": "", "description": text[:100], "error": "parse_failed"}
二、准备工作
2.1 模型选型建议
| 模型 | 适用场景 | 说明 |
|---|---|---|
| claude-haiku-4 | 大批量简单标注、预算有限 | 速度快、价格低,标签数少的场景用它很合适 |
| claude-sonnet-4-5 | 平衡性价比的首选 | 识别精度明显高于 Haiku,需要准确描述的业务优先考虑 |
| claude-opus-4 | 复杂场景深度分析 | 常规批量标注一般用不上 |
具体价格以 Anthropic 官方最新定价为准,下文成本估算基于公开资料中的近似值,实际费用请以账单为准。
2.2 安装依赖
pip install anthropic Pillow tenacity tqdm
2.3 API 接入说明
如果网络环境无法直接访问 Anthropic 官方接口,可以用 ClaudeAPI 等兼容接入平台。这类平台提供与官方 SDK 兼容的接口格式,支持企业充值开票,接入时只需改一下 base_url 参数,其余代码完全不用动。具体额度和线路情况以其官网最新说明为准。
三、单图调用封装:图片预处理与基础调用
3.1 图片压缩预处理
Claude API 对图片的计费和图片尺寸直接挂钩。把图片压缩到长边 1024px 以内,在识别精度基本不受影响的前提下,token 消耗能砍掉相当一部分。
from PIL import Image
import base64, io
from pathlib import Path
def compress_and_encode(image_path: str, max_size: int = 1024) -> tuple[str, str]:
"""压缩图片到指定最大边长,返回 (base64字符串, MIME类型)"""
img = Image.open(image_path)
# 转 RGB,避免 PNG 透明通道引起问题
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# 等比缩放
ratio = min(max_size / img.width, max_size / img.height, 1.0)
if ratio < 1.0:
new_size = (int(img.width * ratio), int(img.height * ratio))
img = img.resize(new_size, Image.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
encoded = base64.standard_b64encode(buffer.getvalue()).decode("utf-8")
return encoded, "image/jpeg"
压缩效果参考:一张 3000×2000 的 JPG(约 3MB)压缩到 1024×682 之后,base64 体积从约 4MB 掉到约 200KB,token 消耗大概能降 60–70%。
3.2 单图调用函数
import anthropic
def tag_single_image(client: anthropic.Anthropic, image_path: str) -> dict:
"""对单张图片调用 Claude API,返回结构化标签"""
img_data, media_type = compress_and_encode(image_path)
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=512,
system=SYSTEM_PROMPT,
messages=[{
"role": "user",
"content": [{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": img_data
}
}, {
"type": "text",
"text": "请分析这张商品图片。"
}]
}]
)
return parse_response(message.content[0].text)
四、批量处理工程实现
4.1 目录遍历与状态管理(断点续传)
批量处理最容易被忽视的一个问题:跑到一半崩了怎么办? 必须用状态文件记录已处理的图片,不然每次都从头来过,既浪费时间又浪费钱。
import json
from pathlib import Path
SUPPORTED_FORMATS = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
def load_processed(state_file: str) -> set:
if Path(state_file).exists():
with open(state_file, 'r') as f:
return set(json.load(f))
return set()
def save_processed(state_file: str, processed: set):
with open(state_file, 'w') as f:
json.dump(list(processed), f)
def get_image_list(image_dir: str) -> list[Path]:
return [p for p in Path(image_dir).rglob('*')
if p.suffix.lower() in SUPPORTED_FORMATS]
4.2 速率限制与重试机制
Claude API 有 RPM(每分钟请求数)和 TPM(每分钟 token 数)的限制,具体上限因账户等级而异。批量调用必须处理好 429 错误,否则一旦触发限流整个流程就卡住了。
用 tenacity 做指数退避,省心又可靠:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic
@retry(
retry=retry_if_exception_type(anthropic.RateLimitError),
wait=wait_exponential(multiplier=2, min=4, max=60),
stop=stop_after_attempt(5)
)
def tag_with_retry(client, image_path):
return tag_single_image(client, image_path)
4.3 并发加速
顺序调用受网络延迟影响很明显,换成线程池之后吞吐量能提升不少。不过并发数不能拉太高,容易撞限流——建议 max_workers 从 3 开始,根据实际触发情况往上调,通常不用超过 10。
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import csv, time
from datetime import datetime
def batch_process(image_dir: str, output_csv: str,
state_file: str = "processed.json",
max_workers: int = 5):
client = anthropic.Anthropic(api_key="YOUR_API_KEY")
all_images = get_image_list(image_dir)
processed = load_processed(state_file)
pending = [p for p in all_images if str(p) not in processed]
print(f"共 {len(all_images)} 张图片,待处理 {len(pending)} 张")
results = []
failed = []
# 追加模式写 CSV,续跑时不会覆盖已有结果
csv_exists = Path(output_csv).exists()
csv_file = open(output_csv, 'a', newline='', encoding='utf-8')
fieldnames = ['path', 'tags', 'category', 'description', 'color', 'style',
'processed_at', 'error']
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
if not csv_exists:
writer.writeheader()
def process_one(image_path):
try:
result = tag_with_retry(client, str(image_path))
result['path'] = str(image_path)
result['processed_at'] = datetime.now().isoformat()
if 'tags' in result:
result['tags'] = '|'.join(result['tags']) # 转成 CSV 友好格式
return result, None
except Exception as e:
return None, {'path': str(image_path), 'error': str(e),
'processed_at': datetime.now().isoformat()}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_one, p): p for p in pending}
for future in tqdm(as_completed(futures), total=len(pending), desc="处理中"):
result, error = future.result()
image_path = futures[future]
if result:
writer.writerow(result)
processed.add(str(image_path))
save_processed(state_file, processed)
else:
writer.writerow({k: '' for k in fieldnames} | error)
failed.append(error['path'])
csv_file.flush()
csv_file.close()
print(f"完成:成功 {len(pending) - len(failed)} 张,失败 {len(failed)} 张")
if failed:
print("失败列表:", failed[:10])
五、Files API 还是 base64?看情况选
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 图片数量 < 50 张 | base64 直接编码 | 实现简单,不需要额外管理文件 ID |
| 图片数量 ≥ 50 张,且同一张图会被多次分析 | Files API | 上传一次,多次引用,省带宽也省处理开销 |
| 图片总体积超过 10MB/请求 | Files API | 避免单个请求体撑得太大 |
Files API 的具体用法参考 Anthropic 官方文档(文件上传),把 base64 source 换成 file_id source 即可,主体调用逻辑不用动。
六、成本估算与省钱方法
6.1 费用参考
下面的估算基于 claude-sonnet-4-5 的公开定价(约 $3/百万输入 token,$15/百万输出 token,以 Anthropic 官网实际定价为准),图片已压缩到 1024px 以内:
| 图片数量 | 估算 input token | 估算费用(美元) | 备注 |
|---|---|---|---|
| 100 张 | ~85 万 | ~$2.5 | 每张约 8500 input token |
| 500 张 | ~425 万 | ~$12.8 | |
| 1000 张 | ~850 万 | ~$25.5 |
图片的 token 消耗和实际分辨率、内容复杂度都有关系,这里只是粗略参考,实际账单以 Anthropic 后台数据为准。
6.2 三个切实可行的省钱方法
第一,压缩图片到 1024px 以内。 如第三节所演示的,图片 token 消耗可以降 60–70%,是单项优化里效果最显著的一个。
第二,Haiku 初筛 + Sonnet 精标搭配用。 先用更便宜的 Haiku 把重复图、模糊图这类不需要详细标注的图片过滤掉,剩下的再交给 Sonnet 处理,整体成本能压下来不少。
第三,控制输出长度。 在 System Prompt 里明确限定描述字数,比如「不超过 50 字」,output token 消耗自然也跟着降下来。
七、常见报错速查
| 错误信息 | 原因 | 解决方案 |
|---|---|---|
anthropic.RateLimitError |
触发速率限制 | tenacity 自动重试;同时考虑降低 max_workers |
Image.UnidentifiedImageError |
图片格式损坏或不支持 | try/except 跳过,记录到失败列表 |
json.JSONDecodeError |
Claude 返回了非 JSON 内容 | 用第一节的 fallback 解析兜底 |
anthropic.BadRequestError: image too large |
单图超过 32MB | 检查压缩逻辑,确保压缩后再写入 |
| 输出中文乱码 | CSV 编码问题 | 改用 open(output_csv, encoding='utf-8-sig') |
八、完整项目结构
image-tagger/
├── main.py # 入口,调用 batch_process()
├── config.py # API Key、模型名称、目录配置
├── tagger.py # tag_single_image、compress_and_encode、parse_response
├── batch.py # batch_process、状态管理、并发逻辑
├── images/ # 待处理图片目录
├── output/
│ └── tags.csv # 标注结果
└── processed.json # 断点续传状态文件
main.py 入口示例:
from config import API_KEY, IMAGE_DIR, OUTPUT_CSV
from batch import batch_process
if __name__ == "__main__":
batch_process(
image_dir=IMAGE_DIR,
output_csv=OUTPUT_CSV,
max_workers=5
)
config.py:
API_KEY = "sk-ant-..." # 或 ClaudeAPI 兼容平台的 Key
IMAGE_DIR = "./images"
OUTPUT_CSV = "./output/tags.csv"
MODEL = "claude-sonnet-4-5"
MAX_IMAGE_SIZE = 1024 # 压缩目标边长(px)
批量图片标注真正的工程难点不在于「怎么调 API」,而在于状态管理、限流处理、成本控制、结构化输出——恰恰是那些演示教程里永远不会涉及的环节。本文的代码结构在数百张图的任务里可以直接拿去用。如果规模再大,数万张以上的话,建议把 ThreadPoolExecutor 换成 Celery + Redis 这样的任务队列,结果也写进数据库而不是 CSV,会更稳。
更多推荐


所有评论(0)