Claude API + DeepSeek API 双轨实战:流式响应、错误处理与多模型编排
·
2026 年,AI API 已经不是「调一个模型」这么简单。你需要处理流式响应、应对 Rate Limit、在多个模型之间做 Fallback、控制 Token 消耗——这些才是真正让 API 集成「能用」和「好用」的区别。本文以 Claude API(Anthropic)和 DeepSeek API 为主角,覆盖从单次调用到生产级多模型编排的完整链路。
目录:
- Claude API vs DeepSeek API 全景对比
- 流式响应处理
- 错误处理与重试策略
- Rate Limit 应对策略
- 双模型 Fallback 架构
- Tool Use 与 Function Calling
- Token 预算管理
- 生产级 SDK 封装
- 成本优化实战
- 常见坑点排查
1. Claude API vs DeepSeek API 全景对比
1.1 定价与能力
| 维度 | Claude (Anthropic) | DeepSeek | OpenAI |
|---|---|---|---|
| 最新模型 | Fable 5 (Opus 4.8 / Sonnet 4.6 / Haiku 4.5) | DeepSeek-V4-Pro | GPT-5 |
| 输入价格(每 1M token) | $15 (Opus) / $3 (Sonnet) / $0.80 (Haiku) | ¥2 (约 $0.27) | $15 (GPT-5) |
| 输出价格(每 1M token) | $75 (Opus) / $15 (Sonnet) / $4 (Haiku) | ¥8 (约 $1.10) | $60 (GPT-5) |
| 最大上下文 | 200K tokens | 128K tokens | 128K tokens |
| 最大输出 | 8K-32K tokens | 8K tokens | 16K tokens |
| Tool Use | ✅ 原生 | ✅ 兼容 OpenAI 格式 | ✅ 原生 |
| 流式响应 | SSE (Server-Sent Events) | SSE (OpenAI 兼容) | SSE |
| 图片理解 | ✅ 多模态 | ❌ (纯文本) | ✅ 多模态 |
| 代码执行 | ✅ Computer Use | ❌ | ✅ Code Interpreter |
| 思考模式 | ✅ Extended Thinking | ❌ | ❌ |
| API 格式 | Anthropic Messages API | OpenAI 兼容 | OpenAI API |
| SDK 语言 | Python, TypeScript | Python, 兼容 OpenAI SDK | Python, TypeScript |
1.2 什么时候用哪个模型
需要深度推理、多步骤任务?
├── YES → Claude Opus (30w+ context, Extended Thinking)
└── NO → 需要多模态(图片理解)?
├── YES → Claude Sonnet / GPT-5
└── NO → 预算敏感?
├── YES → DeepSeek(1/10 的价格,中文优秀)
└── NO → 需要超长输出 → Claude Opus
└── 常规任务 → Claude Sonnet
DeepSeek 的最佳场景:
- 中文任务效果接近甚至超过 GPT-5,价格是 1/55
- 大批量文本处理(总结、翻译、分类)
- 作为 Fallback 备选(便宜,不怕用户多)
Claude 的最佳场景:
- 代码生成和理解(特别是复杂架构)
- 需要 Tool Use 的 Agent 场景
- 需要 Extended Thinking 的深度推理
- 多模态(图片+文本混合输入)
1.3 统一接口设计
在实际项目中,你绝不会只用一个模型。下面是一个统一的调用接口:
// types.ts
export type AIProvider = 'claude' | 'deepseek' | 'openai';
export interface ChatMessage {
role: 'user' | 'assistant' | 'system';
content: string | ContentBlock[];
}
export interface ContentBlock {
type: 'text' | 'image';
text?: string;
source?: {
type: 'base64';
media_type: string;
data: string;
};
}
export interface ChatOptions {
model?: string;
maxTokens?: number;
temperature?: number;
stream?: boolean;
tools?: ToolDefinition[];
systemPrompt?: string;
}
export interface ChatResponse {
id: string;
content: string;
model: string;
usage: {
inputTokens: number;
outputTokens: number;
};
finishReason: 'stop' | 'length' | 'tool_use';
toolCalls?: ToolCall[];
}
export interface ToolDefinition {
name: string;
description: string;
parameters: Record<string, any>; // JSON Schema
}
export interface ToolCall {
id: string;
name: string;
arguments: Record<string, any>;
}
2. 流式响应处理
2.1 为什么必须用流式
非流式(stream: false)请求的问题:
用户 → API → [等待 5-15 秒...] → 一次性返回完整结果
↑
用户已经不耐烦了
流式请求:
用户 → API → "我" → "来" → "帮" → "你" → ...
↑ 0.5s ↑ 0.3s ↑ 0.2s
用户立刻看到打字效果,体验天差地别
2.2 Claude 流式响应
// claude-stream.service.ts
import Anthropic from '@anthropic-ai/sdk';
import { Observable } from 'rxjs';
interface StreamEvent {
type: 'text' | 'thinking' | 'tool_use' | 'error' | 'done';
content?: string;
toolCall?: Partial<ToolCall>;
usage?: { inputTokens: number; outputTokens: number };
}
export class ClaudeStreamService {
private client: Anthropic;
constructor() {
this.client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY!,
});
}
/**
* 流式调用 Claude,返回 Observable
*/
streamChat(options: {
messages: Anthropic.MessageParam[];
model?: string;
system?: string;
maxTokens?: number;
tools?: Anthropic.Tool[];
}): Observable<StreamEvent> {
return new Observable((subscriber) => {
// 标记是否已取消
let aborted = false;
(async () => {
try {
const stream = this.client.messages.stream({
model: options.model || 'claude-sonnet-4-6',
system: options.system,
messages: options.messages,
max_tokens: options.maxTokens || 4096,
tools: options.tools as any,
});
// 监听各种事件
stream.on('text', (text) => {
if (!aborted) {
subscriber.next({ type: 'text', content: text });
}
});
stream.on('thinking', (thinking) => {
if (!aborted) {
subscriber.next({
type: 'thinking',
content: thinking.thinking,
});
}
});
stream.on('contentBlockStart', (block) => {
if (block.content_block.type === 'tool_use' && !aborted) {
subscriber.next({
type: 'tool_use',
toolCall: {
id: block.content_block.id,
name: block.content_block.name,
},
});
}
});
stream.on('contentBlockDelta', (delta) => {
if (delta.delta.type === 'input_json_delta' && !aborted) {
subscriber.next({
type: 'tool_use',
toolCall: {
arguments: delta.delta.partial_json,
},
});
}
});
// 等待流结束
const finalMessage = await stream.finalMessage();
if (!aborted) {
subscriber.next({
type: 'done',
usage: {
inputTokens: finalMessage.usage.input_tokens,
outputTokens: finalMessage.usage.output_tokens,
},
});
subscriber.complete();
}
} catch (error) {
if (!aborted) {
subscriber.next({
type: 'error',
content: this.formatError(error),
});
subscriber.error(error);
}
}
})();
// 返回取消函数
return () => {
aborted = true;
};
});
}
private formatError(error: any): string {
if (error instanceof Anthropic.APIError) {
return `Claude API 错误 [${error.status}]: ${error.message}`;
}
return `未知错误: ${error.message || error}`;
}
}
2.3 DeepSeek 流式响应(OpenAI 兼容)
// deepseek-stream.service.ts
import OpenAI from 'openai';
export class DeepSeekStreamService {
private client: OpenAI;
constructor() {
this.client = new OpenAI({
apiKey: process.env.DEEPSEEK_API_KEY!,
baseURL: 'https://api.deepseek.com/v1',
});
}
async *streamChat(options: {
messages: OpenAI.ChatCompletionMessageParam[];
model?: string;
maxTokens?: number;
temperature?: number;
}): AsyncGenerator<StreamEvent> {
try {
const stream = await this.client.chat.completions.create({
model: options.model || 'deepseek-chat',
messages: options.messages,
max_tokens: options.maxTokens || 4096,
temperature: options.temperature ?? 0.7,
stream: true,
});
let fullContent = '';
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta;
if (delta?.content) {
fullContent += delta.content;
yield { type: 'text', content: delta.content };
}
// DeepSeek 的 tool_calls 也在流中
if (delta?.tool_calls) {
for (const tc of delta.tool_calls) {
yield {
type: 'tool_use',
toolCall: {
id: tc.id,
name: tc.function?.name,
arguments: tc.function?.arguments
? JSON.parse(tc.function.arguments)
: undefined,
},
};
}
}
// 最后一个 chunk 有 usage
if (chunk.choices[0]?.finish_reason) {
yield {
type: 'done',
usage: {
inputTokens: chunk.usage?.prompt_tokens || 0,
outputTokens: chunk.usage?.completion_tokens || 0,
},
};
}
}
} catch (error) {
yield {
type: 'error',
content: error instanceof OpenAI.APIError
? `DeepSeek API 错误 [${error.status}]: ${error.message}`
: `DeepSeek 错误: ${error.message}`,
};
}
}
/**
* 非流式调用(适合批量处理)
*/
async chat(options: {
messages: OpenAI.ChatCompletionMessageParam[];
model?: string;
maxTokens?: number;
temperature?: number;
responseFormat?: { type: 'json_object' };
}): Promise<ChatResponse> {
const response = await this.client.chat.completions.create({
model: options.model || 'deepseek-chat',
messages: options.messages,
max_tokens: options.maxTokens || 4096,
temperature: options.temperature ?? 0.7,
response_format: options.responseFormat,
});
return {
id: response.id,
content: response.choices[0]?.message?.content || '',
model: response.model,
usage: {
inputTokens: response.usage?.prompt_tokens || 0,
outputTokens: response.usage?.completion_tokens || 0,
},
finishReason: response.choices[0]?.finish_reason as any || 'stop',
};
}
}
2.4 SSE 推送到前端
// stream.controller.ts (NestJS)
import { Controller, Post, Body, Res, Req } from '@nestjs/common';
import { Response, Request } from 'express';
@Controller('api/chat')
export class ChatController {
constructor(
private unifiedService: UnifiedAIService,
) {}
@Post('stream')
async streamChat(
@Body() body: { messages: ChatMessage[]; provider?: AIProvider },
@Req() req: Request,
@Res() res: Response,
) {
// 设置 SSE 头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // 禁用 Nginx 缓冲
// 监听客户端断开
req.on('close', () => {
res.end();
});
try {
const stream = this.unifiedService.streamChat(body.messages, body.provider);
for await (const event of stream) {
if (res.destroyed) break; // 客户端断开则停止
// SSE 格式:data: JSON\n\n
res.write(`data: ${JSON.stringify(event)}\n\n`);
if (event.type === 'done' || event.type === 'error') {
break;
}
}
} catch (error) {
res.write(`data: ${JSON.stringify({ type: 'error', content: error.message })}\n\n`);
} finally {
res.end();
}
}
}
前端消费 SSE:
// useStreamChat.ts
export function useStreamChat() {
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef<AbortController | null>(null);
const sendMessage = async (messages: ChatMessage[], provider?: string) => {
setIsStreaming(true);
setContent('');
const controller = new AbortController();
abortRef.current = controller;
try {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages, provider }),
signal: controller.signal,
});
const reader = response.body?.getReader();
if (!reader) throw new Error('No reader');
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.type === 'text') {
setContent(prev => prev + data.content);
} else if (data.type === 'done') {
setIsStreaming(false);
} else if (data.type === 'error') {
throw new Error(data.content);
}
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
setContent(prev => prev + `\n\n❌ 错误: ${error.message}`);
}
setIsStreaming(false);
}
};
const abort = () => {
abortRef.current?.abort();
setIsStreaming(false);
};
return { content, isStreaming, sendMessage, abort };
}
3. 错误处理与重试策略
3.1 常见错误类型
// error-handler.ts
export enum AIErrorType {
RATE_LIMIT = 'rate_limit', // 429: 请求太频繁
AUTH_ERROR = 'auth_error', // 401: API Key 无效
SERVER_ERROR = 'server_error', // 5xx: 服务端临时故障
TIMEOUT = 'timeout', // 请求超时
CONTEXT_OVERFLOW = 'context_overflow', // 输入超过上下文限制
CONTENT_FILTER = 'content_filter', // 内容被安全过滤
NETWORK_ERROR = 'network_error', // 网络断开
UNKNOWN = 'unknown',
}
export class AIError extends Error {
constructor(
public type: AIErrorType,
public statusCode?: number,
public originalError?: any,
) {
super(originalError?.message || 'Unknown AI error');
this.name = 'AIError';
}
get isRetryable(): boolean {
// 这些错误重试可能成功
return [
AIErrorType.RATE_LIMIT,
AIErrorType.SERVER_ERROR,
AIErrorType.TIMEOUT,
AIErrorType.NETWORK_ERROR,
].includes(this.type);
}
}
export function classifyError(error: any): AIError {
// Claude API 错误
if (error?.status === 429) {
return new AIError(AIErrorType.RATE_LIMIT, 429, error);
}
if (error?.status === 401 || error?.status === 403) {
return new AIError(AIErrorType.AUTH_ERROR, error.status, error);
}
if (error?.status && error.status >= 500) {
return new AIError(AIErrorType.SERVER_ERROR, error.status, error);
}
// OpenAI / DeepSeek 错误
if (error?.status === 429 || error?.code === 'rate_limit_exceeded') {
return new AIError(AIErrorType.RATE_LIMIT, 429, error);
}
if (error?.code === 'context_length_exceeded') {
return new AIError(AIErrorType.CONTEXT_OVERFLOW, 400, error);
}
if (error?.code === 'content_filter') {
return new AIError(AIErrorType.CONTENT_FILTER, 400, error);
}
// 网络错误
if (error?.code === 'ECONNRESET' || error?.code === 'ETIMEDOUT' || error?.name === 'AbortError') {
return new AIError(AIErrorType.NETWORK_ERROR, undefined, error);
}
return new AIError(AIErrorType.UNKNOWN, undefined, error);
}
3.2 指数退避重试器
// retry.service.ts
import { Logger } from '@nestjs/common';
import { classifyError, AIError, AIErrorType } from './error-handler';
interface RetryConfig {
maxRetries: number;
baseDelayMs: number;
maxDelayMs: number;
jitter: boolean;
retryableErrors: AIErrorType[];
}
const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxRetries: 3,
baseDelayMs: 1000,
maxDelayMs: 30000,
jitter: true,
retryableErrors: [
AIErrorType.RATE_LIMIT,
AIErrorType.SERVER_ERROR,
AIErrorType.TIMEOUT,
AIErrorType.NETWORK_ERROR,
],
};
export class RetryService {
private readonly logger = new Logger(RetryService.name);
/**
* 带指数退避的执行器
*
* 退避公式:min(baseDelay * 2^attempt + jitter, maxDelay)
* 第 1 次重试:~1-2s
* 第 2 次重试:~2-4s
* 第 3 次重试:~4-8s
*/
async withRetry<T>(
fn: () => Promise<T>,
config: Partial<RetryConfig> = {},
): Promise<T> {
const cfg = { ...DEFAULT_RETRY_CONFIG, ...config };
let lastError: AIError | null = null;
for (let attempt = 0; attempt <= cfg.maxRetries; attempt++) {
try {
if (attempt > 0) {
this.logger.log(`重试第 ${attempt}/${cfg.maxRetries} 次...`);
}
return await fn();
} catch (error) {
const aiError = classifyError(error);
lastError = aiError;
// 最后一次尝试不再重试
if (attempt >= cfg.maxRetries) {
break;
}
// 不可重试的错误,直接抛
if (!cfg.retryableErrors.includes(aiError.type)) {
throw aiError;
}
// 计算延迟
const delay = this.calculateDelay(attempt, cfg);
this.logger.warn(
`${aiError.type} 错误,${delay}ms 后重试 (第 ${attempt + 1} 次)`,
);
// 对于 Rate Limit,优先使用 Retry-After 头
const retryAfter = this.getRetryAfter(error);
const actualDelay = retryAfter
? Math.max(delay, retryAfter * 1000)
: delay;
await this.sleep(actualDelay);
}
}
throw lastError || new Error('Unknown retry error');
}
private calculateDelay(attempt: number, config: RetryConfig): number {
// 指数退避
let delay = config.baseDelayMs * Math.pow(2, attempt);
// 上限
delay = Math.min(delay, config.maxDelayMs);
// 抖动(避免惊群效应)
if (config.jitter) {
delay = delay * (0.5 + Math.random() * 0.5);
}
return Math.round(delay);
}
private getRetryAfter(error: any): number | null {
// Anthropic: retry-after 头
if (error?.headers?.['retry-after']) {
return parseInt(error.headers['retry-after']);
}
// OpenAI: retry-after-ms 头
if (error?.headers?.['retry-after-ms']) {
return parseInt(error.headers['retry-after-ms']) / 1000;
}
return null;
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
3.3 错误降级策略
// fallback-strategy.ts
/**
* 错误降级决策树
*
* Rate Limit → 换模型重试 (Claude → DeepSeek)
* Server Error → 换模型重试
* Context Overflow → 截断历史消息重试
* Content Filter → 返回友好提示,不重试
* Auth Error → 告警,不重试
* Timeout → 换更快的模型重试
*/
export function getFallbackAction(error: AIError): FallbackAction {
switch (error.type) {
case AIErrorType.RATE_LIMIT:
return { action: 'switch_provider', reason: '当前模型限流' };
case AIErrorType.SERVER_ERROR:
return { action: 'switch_provider', reason: '服务端故障' };
case AIErrorType.CONTEXT_OVERFLOW:
return { action: 'truncate', reason: '上下文超长,尝试截断' };
case AIErrorType.TIMEOUT:
return { action: 'switch_to_faster', reason: '超时,换更快模型' };
case AIErrorType.CONTENT_FILTER:
return { action: 'reject', reason: '内容不符合安全策略' };
case AIErrorType.AUTH_ERROR:
return { action: 'alert', reason: 'API Key 配置错误' };
default:
return { action: 'switch_provider', reason: '未知错误,切换模型' };
}
}
interface FallbackAction {
action: 'switch_provider' | 'switch_to_faster' | 'truncate' | 'reject' | 'alert';
reason: string;
}
4. Rate Limit 应对策略
4.1 各平台的限流规则
| 平台 | 免费/试用 | 付费 Tier 1 | Tier 2+ |
|---|---|---|---|
| Anthropic | 5 RPM / 20K TPM | 50 RPM / 50K TPM | 1,000+ RPM |
| DeepSeek | 50 RPM / 50K TPM | 500 RPM / 1M TPM | 联系客服 |
| OpenAI | 3 RPM / 40K TPM | 500 RPM / 450K TPM | 5,000+ RPM |
4.2 令牌桶限流器
// token-bucket.ts
/**
* 令牌桶算法实现
*
* 原理:
* - 桶以固定速率(rate)填充令牌
* - 每次请求消耗一个令牌
* - 令牌用完则等待或拒绝
*/
export class TokenBucket {
private tokens: number;
private lastRefill: number;
constructor(
private rate: number, // 每秒补充的令牌数
private capacity: number, // 桶容量(最大令牌数)
) {
this.tokens = capacity;
this.lastRefill = Date.now();
}
/**
* 尝试获取令牌
* @returns 需要等待的毫秒数,0 表示可以立即执行
*/
tryConsume(): { allowed: boolean; waitMs: number } {
this.refill();
if (this.tokens >= 1) {
this.tokens -= 1;
return { allowed: true, waitMs: 0 };
}
// 计算需要等多久才能获得一个令牌
const waitMs = Math.ceil((1 - this.tokens) / this.rate * 1000);
return { allowed: false, waitMs };
}
/**
* 等待直到可以执行
*/
async consume(): Promise<void> {
const { allowed, waitMs } = this.tryConsume();
if (!allowed) {
await new Promise(resolve => setTimeout(resolve, waitMs));
return this.consume();
}
}
private refill(): void {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000; // 秒
this.tokens = Math.min(
this.capacity,
this.tokens + elapsed * this.rate,
);
this.lastRefill = now;
}
}
4.3 请求队列管理器
// request-queue.ts
import { Logger } from '@nestjs/common';
interface QueuedRequest<T> {
id: string;
priority: number; // 0=最高, 9=最低
execute: () => Promise<T>;
resolve: (value: T) => void;
reject: (error: any) => void;
createdAt: number;
}
export class AIRequestQueue {
private readonly logger = new Logger(AIRequestQueue.name);
private queues: Map<string, QueuedRequest<any>[]> = new Map();
private processing = new Set<string>();
private maxConcurrent = 5; // 默认最大并发数
constructor(
private bucket: TokenBucket,
maxConcurrent?: number,
) {
if (maxConcurrent) this.maxConcurrent = maxConcurrent;
}
async enqueue<T>(
provider: string,
execute: () => Promise<T>,
priority: number = 5,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const request: QueuedRequest<T> = {
id: `${provider}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
priority,
execute,
resolve,
reject,
createdAt: Date.now(),
};
// 加入队列
if (!this.queues.has(provider)) {
this.queues.set(provider, []);
}
const queue = this.queues.get(provider)!;
// 按优先级插入(低数字 = 高优先级)
const insertIndex = queue.findIndex(r => r.priority > priority);
if (insertIndex === -1) {
queue.push(request);
} else {
queue.splice(insertIndex, 0, request);
}
this.logger.debug(
`${provider} 队列: ${queue.length} 个待处理, 优先级 ${priority}`,
);
// 触发处理
this.processQueue(provider);
});
}
private async processQueue(provider: string): Promise<void> {
if (this.processing.has(provider)) return;
const queue = this.queues.get(provider);
if (!queue || queue.length === 0) return;
this.processing.add(provider);
try {
while (queue.length > 0) {
// 并发控制
if (this.processing.size > this.maxConcurrent) {
break;
}
// 等待令牌
await this.bucket.consume();
const request = queue.shift();
if (!request) break;
// 超时检查(超过 60 秒的排队请求直接拒绝)
if (Date.now() - request.createdAt > 60000) {
request.reject(new Error('请求排队超时'));
continue;
}
try {
const result = await request.execute();
request.resolve(result);
} catch (error) {
request.reject(error);
}
}
} finally {
this.processing.delete(provider);
// 如果队列还有剩余,继续处理
if (queue.length > 0) {
setTimeout(() => this.processQueue(provider), 100);
}
}
}
}
5. 双模型 Fallback 架构
5.1 统一 AI 服务
// unified-ai.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { RetryService } from './retry.service';
import { AIRequestQueue } from './request-queue';
import { TokenBucket } from './token-bucket';
import { classifyError, AIError } from './error-handler';
interface ProviderConfig {
name: string;
priority: number; // 越小越优先
models: string[];
bucket: TokenBucket;
maxContextLength: number;
supportsVision: boolean;
supportsToolUse: boolean;
}
@Injectable()
export class UnifiedAIService {
private readonly logger = new Logger(UnifiedAIService.name);
private providers: Map<string, ProviderConfig>;
private queues: Map<string, AIRequestQueue>;
constructor(private retry: RetryService) {
// 初始化提供商
this.providers = new Map([
['claude-opus', {
name: 'claude-opus',
priority: 0,
models: ['claude-opus-4-8'],
bucket: new TokenBucket(0.8, 5), // 每秒 0.8 个请求,最多 5 个并发
maxContextLength: 200000,
supportsVision: true,
supportsToolUse: true,
}],
['claude-sonnet', {
name: 'claude-sonnet',
priority: 1,
models: ['claude-sonnet-4-6'],
bucket: new TokenBucket(2, 10),
maxContextLength: 200000,
supportsVision: true,
supportsToolUse: true,
}],
['deepseek', {
name: 'deepseek',
priority: 2,
models: ['deepseek-chat'],
bucket: new TokenBucket(5, 20), // DeepSeek 限流较宽松
maxContextLength: 128000,
supportsVision: false,
supportsToolUse: true,
}],
]);
// 为每个提供商创建请求队列
this.queues = new Map();
for (const [name, config] of this.providers) {
this.queues.set(name, new AIRequestQueue(config.bucket));
}
}
/**
* 智能调用:自动选择最佳模型,失败时自动降级
*/
async chat(options: {
messages: any[];
preferredProvider?: string;
hasImages?: boolean;
needsToolUse?: boolean;
maxTokens?: number;
}): Promise<{ content: string; provider: string; model: string; usage: any }> {
const errors: Array<{ provider: string; error: string }> = [];
// 构建候选列表(按优先级排序)
const candidates = this.getCandidates(options);
for (const candidate of candidates) {
try {
this.logger.log(`尝试 ${candidate.name} (${candidate.model})...`);
const result = await this.retry.withRetry(
() => this.callProvider(candidate, options.messages, options.maxTokens),
{ maxRetries: 2 },
);
this.logger.log(`✅ ${candidate.name} 成功`);
return {
content: result.content,
provider: candidate.name,
model: candidate.model,
usage: result.usage,
};
} catch (error) {
const aiError = classifyError(error);
errors.push({
provider: candidate.name,
error: `${aiError.type}: ${aiError.message}`,
});
this.logger.warn(`❌ ${candidate.name} 失败: ${aiError.type}`);
// Content Filter 不换模型重试(换了也一样被拦截)
if (aiError.type === 'content_filter') {
throw new Error('内容不符合安全策略,请修改后重试');
}
}
}
// 所有提供商都失败了
this.logger.error('所有 AI 提供商调用失败', errors);
throw new Error(
`所有 AI 提供商调用失败:\n${errors.map(e => ` - ${e.provider}: ${e.error}`).join('\n')}`,
);
}
/**
* 流式版本(带 Fallback)
*/
async *streamChat(options: {
messages: any[];
preferredProvider?: string;
hasImages?: boolean;
needsToolUse?: boolean;
}): AsyncGenerator<any> {
const candidates = this.getCandidates(options);
for (const candidate of candidates) {
try {
this.logger.log(`流式调用 ${candidate.name}...`);
const stream = this.callProviderStream(candidate, options.messages);
for await (const event of stream) {
yield event;
}
return; // 成功则结束
} catch (error) {
this.logger.warn(`流式 ${candidate.name} 失败,尝试下一个...`);
// 流式失败继续下一个候选
}
}
yield {
type: 'error',
content: '所有 AI 提供商调用失败,请稍后重试',
};
}
private getCandidates(options: {
preferredProvider?: string;
hasImages?: boolean;
needsToolUse?: boolean;
}): Array<{ name: string; model: string }> {
// 按优先级排序所有提供商
const sorted = Array.from(this.providers.values())
.sort((a, b) => a.priority - b.priority);
// 如果指定了首选,把它排到最前
if (options.preferredProvider) {
const preferred = this.providers.get(options.preferredProvider);
if (preferred) {
const others = sorted.filter(p => p.name !== preferred.name);
sorted.splice(0, sorted.length, preferred, ...others);
}
}
return sorted
.filter((p) => {
// 有图片只能用 Claude
if (options.hasImages && !p.supportsVision) return false;
// 需要 Tool Use 的排除不支持的服务
if (options.needsToolUse && !p.supportsToolUse) return false;
return true;
})
.map(p => ({ name: p.name, model: p.models[0] }));
}
private async callProvider(
candidate: { name: string; model: string },
messages: any[],
maxTokens?: number,
): Promise<{ content: string; usage: any }> {
// 通过队列执行(自动限流)
const queue = this.queues.get(candidate.name)!;
return queue.enqueue(candidate.name, async () => {
// 实际调用逻辑(根据 candidate.name 分派到不同 SDK)
return this.dispatchCall(candidate, messages, maxTokens);
});
}
private async dispatchCall(
candidate: { name: string; model: string },
messages: any[],
maxTokens?: number,
): Promise<{ content: string; usage: any }> {
// 根据 provider 名称分派
switch (candidate.name) {
case 'claude-opus':
case 'claude-sonnet':
return this.callClaude(candidate.model, messages, maxTokens);
case 'deepseek':
return this.callDeepSeek(candidate.model, messages, maxTokens);
default:
throw new Error(`Unknown provider: ${candidate.name}`);
}
}
// ... callClaude / callDeepSeek 实现
}
5.2 负载均衡与成本感知路由
// cost-aware-router.ts
/**
* 成本感知路由器
*
* 根据任务复杂度自动选择性价比最高的模型:
* - 简单任务(分类、翻译、摘要)→ DeepSeek
* - 中等任务(问答、生成)→ Claude Sonnet
* - 复杂任务(架构设计、深度推理)→ Claude Opus
*/
export class CostAwareRouter {
/**
* 估算任务复杂度
*/
estimateComplexity(messages: any[]): 'simple' | 'medium' | 'complex' {
const userMessage = messages.filter(m => m.role === 'user').pop();
const content = typeof userMessage?.content === 'string'
? userMessage.content
: JSON.stringify(userMessage?.content);
const length = content?.length || 0;
// 启发式规则
if (length < 100) return 'simple';
if (length < 500) return 'medium';
// 检测复杂度关键词
const complexKeywords = [
'架构', '设计', '系统', '方案', '分析', '评估', '审查',
'architecture', 'design', 'system', 'analyze', 'review',
];
const hasComplexKeyword = complexKeywords.some(kw =>
content.toLowerCase().includes(kw),
);
if (hasComplexKeyword) return 'complex';
return 'medium';
}
/**
* 根据复杂度选择模型
*/
selectProvider(complexity: 'simple' | 'medium' | 'complex'): string {
switch (complexity) {
case 'simple':
return 'deepseek'; // 便宜 10 倍,效果够用
case 'medium':
return 'claude-sonnet'; // 性价比最佳
case 'complex':
return 'claude-opus'; // 最强推理能力
}
}
}
6. Tool Use 与 Function Calling
6.1 Claude Tool Use
// claude-tool-use.service.ts
import Anthropic from '@anthropic-ai/sdk';
interface ToolResult {
toolUseId: string;
content: string;
isError?: boolean;
}
export class ClaudeToolUseService {
private client: Anthropic;
constructor() {
this.client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY!,
});
}
/**
* 带工具的对话循环
* Claude 可以自主决定何时调用工具、调用哪个工具
*/
async toolLoop(
userMessage: string,
tools: Anthropic.Tool[],
toolHandlers: Record<string, (args: any) => Promise<string>>,
systemPrompt?: string,
): Promise<string> {
const messages: Anthropic.MessageParam[] = [
{ role: 'user', content: userMessage },
];
const maxRounds = 10;
for (let round = 0; round < maxRounds; round++) {
const response = await this.client.messages.create({
model: 'claude-sonnet-4-6',
system: systemPrompt,
messages,
tools: tools as any,
max_tokens: 4096,
});
// 检查是否有 tool_use
const toolUses = response.content.filter(
(block): block is Anthropic.ToolUseBlock => block.type === 'tool_use',
);
if (toolUses.length === 0) {
// 没有工具调用,直接返回文本
const textBlock = response.content.find(b => b.type === 'text');
return (textBlock as Anthropic.TextBlock)?.text || '';
}
// 执行工具调用
const toolResults: ToolResult[] = [];
for (const toolUse of toolUses) {
const handler = toolHandlers[toolUse.name];
if (!handler) {
toolResults.push({
toolUseId: toolUse.id,
content: `Unknown tool: ${toolUse.name}`,
isError: true,
});
continue;
}
try {
const result = await handler(toolUse.input);
toolResults.push({
toolUseId: toolUse.id,
content: result,
});
} catch (error) {
toolResults.push({
toolUseId: toolUse.id,
content: `Tool error: ${error.message}`,
isError: true,
});
}
}
// 将 assistant 消息和 tool 结果加入对话
messages.push({
role: 'assistant',
content: response.content,
});
messages.push({
role: 'user',
content: toolResults.map(tr => ({
type: 'tool_result' as const,
tool_use_id: tr.toolUseId,
content: tr.content,
is_error: tr.isError || false,
})),
});
// 继续下一轮
}
return '已达到最大工具调用轮数,任务未完成。';
}
}
6.2 DeepSeek Function Calling
// deepseek-function-call.service.ts
import OpenAI from 'openai';
export class DeepSeekFunctionCallService {
private client: OpenAI;
constructor() {
this.client = new OpenAI({
apiKey: process.env.DEEPSEEK_API_KEY!,
baseURL: 'https://api.deepseek.com/v1',
});
}
async functionLoop(
userMessage: string,
functions: OpenAI.ChatCompletionCreateParams.Function[],
handlers: Record<string, (args: any) => Promise<string>>,
): Promise<string> {
const messages: OpenAI.ChatCompletionMessageParam[] = [
{ role: 'user', content: userMessage },
];
for (let round = 0; round < 10; round++) {
const response = await this.client.chat.completions.create({
model: 'deepseek-chat',
messages,
functions,
function_call: 'auto',
});
const choice = response.choices[0];
const message = choice.message;
// 如果没有 function call,返回文本
if (!message.function_call) {
return message.content || '';
}
// 执行函数
const funcName = message.function_call.name;
const handler = handlers[funcName];
let result: string;
if (!handler) {
result = `Error: Unknown function "${funcName}"`;
} else {
try {
const args = JSON.parse(message.function_call.arguments);
result = await handler(args);
} catch (error) {
result = `Error: ${error.message}`;
}
}
// 将 assistant 和 function 结果加入消息
messages.push(message);
messages.push({
role: 'function',
name: funcName,
content: result,
});
}
return '已达到最大函数调用轮数。';
}
}
7. Token 预算管理
7.1 Token 计数
// token-counter.service.ts
/**
* Token 计数服务
*
* 精确计数需要调用各平台的 tokenizer,这里提供估算和精确两种方式
*/
export class TokenCounterService {
/**
* 估算 Token 数(不需要 API 调用)
*
* 经验公式(适用于中英文混合文本):
* - 英文:1 token ≈ 0.75 个单词 ≈ 4 个字符
* - 中文:1 token ≈ 1.5 个汉字 ≈ 3 个字符
* - 代码:1 token ≈ 1 个标识符/运算符
*/
estimateTokens(text: string): number {
// 分别计算中英文字符
const chineseChars = (text.match(/[一-鿿]/g) || []).length;
const otherChars = text.length - chineseChars;
// 中文字符:~1.5 字符/token
// 其他字符:~4 字符/token
return Math.ceil(chineseChars / 1.5 + otherChars / 4);
}
/**
* 使用 tiktoken 精确计数(OpenAI 模型)
*/
async countWithTiktoken(text: string, model: string = 'gpt-4'): Promise<number> {
// 需要安装 tiktoken 包
const { encoding_for_model } = await import('tiktoken');
const enc = encoding_for_model(model as any);
const tokens = enc.encode(text);
enc.free();
return tokens.length;
}
/**
* 使用 Anthropic 的 token 计数
*/
async countWithAnthropic(
messages: any[],
systemPrompt?: string,
): Promise<number> {
const anthropic = new (await import('@anthropic-ai/sdk')).default({
apiKey: process.env.ANTHROPIC_API_KEY!,
});
const result = await anthropic.messages.countTokens({
model: 'claude-sonnet-4-6',
messages,
system: systemPrompt,
});
return result.input_tokens;
}
}
7.2 对话历史压缩
// context-compressor.ts
/**
* 对话历史压缩器
*
* 当对话历史超过模型的上下文限制时,自动压缩旧消息
*
* 策略:
* 1. 保留 system prompt(永远不删)
* 2. 保留最近 N 条消息
* 3. 对于更早的消息:
* a. 用 LLM 生成摘要
* b. 将摘要以 system 消息形式插入
*/
export class ContextCompressor {
private readonly MAX_CONTEXT_RATIO = 0.8; // 最多使用 80% 的上下文窗口
private readonly RECENT_MESSAGES_KEEP = 10; // 保留最近 10 条消息
/**
* 压缩消息列表以适应上下文窗口
*/
async compress(
messages: any[],
systemPrompt: string | undefined,
maxContextTokens: number,
tokenCounter: (text: string) => number,
): Promise<{ messages: any[]; summary?: string }> {
const budget = Math.floor(maxContextTokens * this.MAX_CONTEXT_RATIO);
// 计算 system prompt 的 token 数
const systemTokens = systemPrompt ? tokenCounter(systemPrompt) : 0;
const availableTokens = budget - systemTokens;
// 计算当前消息的总 token
let totalTokens = 0;
for (const msg of messages) {
totalTokens += tokenCounter(
typeof msg.content === 'string' ? msg.content : JSON.stringify(msg.content),
);
}
// 没超就不压缩
if (totalTokens <= availableTokens) {
return { messages };
}
// 从旧到新压缩
const recent = messages.slice(-this.RECENT_MESSAGES_KEEP);
const old = messages.slice(0, -this.RECENT_MESSAGES_KEEP);
// 将旧消息总结为一条
const oldText = old
.map(m => `[${m.role}]: ${typeof m.content === 'string' ? m.content : '(多模态内容)'}`)
.join('\n');
const summary = await this.summarizeHistory(oldText);
return {
messages: recent,
summary: `早期对话历史摘要:\n${summary}`,
};
}
/**
* 用轻量模型总结历史对话
*/
private async summarizeHistory(history: string): Promise<string> {
// 用 Haiku 或 DeepSeek 做总结(便宜)
const openai = new (await import('openai')).default({
apiKey: process.env.DEEPSEEK_API_KEY!,
baseURL: 'https://api.deepseek.com/v1',
});
const response = await openai.chat.completions.create({
model: 'deepseek-chat',
messages: [
{
role: 'system',
content: '请用 200 字以内总结以下对话的关键信息和决议。只保留对后续对话有帮助的内容。',
},
{ role: 'user', content: history },
],
max_tokens: 500,
});
return response.choices[0]?.message?.content || '';
}
}
8. 生产级 SDK 封装
8.1 完整服务封装
// ai.service.ts
// 以上所有模块的统一入口
import { Injectable, Logger } from '@nestjs/common';
import { UnifiedAIService } from './unified-ai.service';
import { RetryService } from './retry.service';
import { TokenCounterService } from './token-counter.service';
import { ContextCompressor } from './context-compressor';
import { CostAwareRouter } from './cost-aware-router';
@Injectable()
export class AIService {
private readonly logger = new Logger(AIService.name);
constructor(
private unified: UnifiedAIService,
private retry: RetryService,
private tokenCounter: TokenCounterService,
private compressor: ContextCompressor,
private router: CostAwareRouter,
) {}
/**
* 对外暴露的高级 API
*/
async chat(options: {
messages: Array<{ role: string; content: string }>;
systemPrompt?: string;
preferredProvider?: string;
autoSelect?: boolean; // 自动选择性价比最高的模型
maxTokens?: number;
}) {
// 自动选择模型
let provider = options.preferredProvider;
if (options.autoSelect && !provider) {
provider = this.router.selectProvider(
this.router.estimateComplexity(options.messages),
);
this.logger.log(`自动选择: ${provider}`);
}
// 上下文压缩(如果需要)
let messages = options.messages;
let systemPrompt = options.systemPrompt;
const estimatedTokens = messages.reduce(
(sum, m) => sum + this.tokenCounter.estimateTokens(m.content),
0,
);
if (estimatedTokens > 150000) {
this.logger.warn(
`上下文较大 (${estimatedTokens} tokens),正在压缩...`,
);
const compressed = await this.compressor.compress(
messages,
systemPrompt,
200000,
(text) => this.tokenCounter.estimateTokens(text),
);
messages = compressed.messages;
if (compressed.summary) {
systemPrompt = systemPrompt
? `${systemPrompt}\n\n${compressed.summary}`
: compressed.summary;
}
}
return this.unified.chat({
messages,
preferredProvider: provider,
maxTokens: options.maxTokens,
});
}
/**
* 流式聊天
*/
streamChat(messages: Array<{ role: string; content: string }>) {
return this.unified.streamChat({ messages });
}
/**
* JSON 结构化输出
*/
async structuredOutput<T>(
prompt: string,
schema: Record<string, any>,
): Promise<T> {
const response = await this.unified.chat({
messages: [
{
role: 'user',
content: `请严格按照以下 JSON Schema 输出 JSON:\n\`\`\`json\n${JSON.stringify(schema, null, 2)}\n\`\`\`\n\n${prompt}`,
},
],
preferredProvider: 'deepseek', // DeepSeek 支持 JSON mode
maxTokens: 2000,
});
// 提取 JSON
const jsonMatch = response.content.match(/```(?:json)?\n?([\s\S]*?)\n?```/);
const jsonStr = jsonMatch ? jsonMatch[1] : response.content;
return JSON.parse(jsonStr.trim());
}
}
9. 成本优化实战
9.1 成本对比计算
假设你的应用每天处理 1000 次对话,平均每次 2000 input + 500 output tokens:
Claude Opus = 1000 × (2000×$15/M + 500×$75/M) = 1000 × ($0.03 + $0.0375) = $67.5/天
Claude Sonnet = 1000 × (2000×$3/M + 500×$15/M) = 1000 × ($0.006 + $0.0075) = $13.5/天
DeepSeek = 1000 × (2000×$0.27/M + 500×$1.1/M) = 1000 × ($0.00054 + $0.00055) ≈ $1.09/天
= DeepSeek 的成本是 Claude Opus 的 1/62 😱
9.2 优化策略
// cost-optimizer.ts
export class CostOptimizer {
/**
* 缓存重复查询
*/
// 见上文 RagCacheService(基于 Redis 的语义缓存)
/**
* 小模型预判
* 先用 Haiku 判断是否需要大模型,只有复杂任务才调用 Opus
*/
async withTriaging(userMessage: string): Promise<string> {
// Step 1: 用 Haiku 判断复杂度
const triage = await this.haikuChat({
system: `判断以下用户请求的复杂度。只回答 "simple"、"medium" 或 "complex"。
simple: 简单问答、翻译、摘要、分类
medium: 常规问题解答、代码片段、中等推理
complex: 架构设计、深度分析、多步推理、长篇代码`,
messages: [{ role: 'user', content: userMessage }],
maxTokens: 10,
});
const complexity = triage.content.trim().toLowerCase();
// Step 2: 根据复杂度选择模型
switch (complexity) {
case 'simple':
return this.deepseekChat(userMessage); // $0.001
case 'medium':
return this.sonnetChat(userMessage); // $0.01
case 'complex':
return this.opusChat(userMessage); // $0.07
default:
return this.sonnetChat(userMessage);
}
}
/**
* 流式输出提前停止
* 当用户已离开页面时,取消流式请求,节省 token
*/
}
9.3 成本监控
// cost-tracker.ts
import { Injectable } from '@nestjs/common';
import { Counter, Histogram } from 'prom-client';
@Injectable()
export class CostTracker {
// 按模型统计 Token 消耗
tokenUsage = new Counter({
name: 'ai_token_usage_total',
help: 'Total tokens consumed by AI API calls',
labelNames: ['provider', 'model', 'type'], // type: input/output
});
// 按模型统计 API 调用次数
apiCalls = new Counter({
name: 'ai_api_calls_total',
help: 'Total AI API calls',
labelNames: ['provider', 'model', 'status'], // status: success/error
});
// API 延迟
apiLatency = new Histogram({
name: 'ai_api_latency_seconds',
help: 'AI API call latency',
labelNames: ['provider', 'model'],
buckets: [0.5, 1, 2, 5, 10, 30, 60],
});
// 按模型的价格(美元/1M tokens)
private pricing = {
'claude-opus-4-8': { input: 15, output: 75 },
'claude-sonnet-4-6': { input: 3, output: 15 },
'claude-haiku-4-5': { input: 0.80, output: 4 },
'deepseek-chat': { input: 0.27, output: 1.1 },
};
/**
* 记录一次 API 调用
*/
recordCall(
provider: string,
model: string,
inputTokens: number,
outputTokens: number,
success: boolean,
latencyMs: number,
) {
this.tokenUsage.inc({ provider, model, type: 'input' }, inputTokens);
this.tokenUsage.inc({ provider, model, type: 'output' }, outputTokens);
this.apiCalls.inc({ provider, model, status: success ? 'success' : 'error' });
this.apiLatency.observe({ provider, model }, latencyMs / 1000);
}
/**
* 计算本次调用成本
*/
calculateCost(
model: string,
inputTokens: number,
outputTokens: number,
): number {
const price = this.pricing[model];
if (!price) return 0;
return (
(inputTokens / 1_000_000) * price.input +
(outputTokens / 1_000_000) * price.output
);
}
}
10. 常见坑点排查
10.1 「Claude API 返回 400:invalid_request_error」
最常见的原因:messages 数组的第一个消息 role 不是 'user'。
❌ 错误:
messages: [
{ role: 'assistant', content: '你好!' }, // 第一条不能是 assistant
{ role: 'user', content: '帮我...' },
]
✅ 正确:
messages: [
{ role: 'user', content: '帮我...' },
]
⸻
另一个常见原因:最后一轮的 tool_result 还没发就发了新的 user 消息。
确保 tool_use → tool_result 成对出现。
10.2 「DeepSeek 返回空响应」
原因:DeepSeek 对某些输出会做安全截断。
排查:
1. 检查是否涉及敏感话题
2. 降低 temperature 到 0.2
3. 添加 system prompt 引导
4. 切换到 Claude(Claude 的内容策略更宽松)
10.3 「流式响应突然中断」
// 原因排查清单
// 1. Nginx 缓冲导致超时
// 确保配置了:
// proxy_buffering off;
// proxy_read_timeout 300s;
// 2. 客户端网络断开
// 服务端需要检测 res.destroyed
// 3. API 服务端主动断开
// 检查是否有 60 秒超时限制,超长生成任务可能会断
// 4. SSE 格式错误
// 确保每行格式为 "data: JSON\n\n"
10.4 「Token 消耗远超预期」
// 常见原因:
// 1. 没有裁剪历史消息,每次把整个对话历史都发给 API
// 解决:实现滑动窗口 + 旧消息摘要
// 2. Tool 调用的 input/output 也消耗 token
// 解决:Tool 结果精简(只返回必要字段)
// 3. System Prompt 太长
// 解决:控制在 500 tokens 以内
// 4. 重复发送相同的内容
// 解决:语义缓存
总结
多模型 API 集成不是「调一个 endpoint」那么简单,而是一个涉及流式处理、错误恢复、限流控制、成本管理的系统工程。本文的核心要点:
- Claude 适合复杂推理和代码,DeepSeek 适合高性价比的中文任务
- 流式响应是标配——用户的耐心撑不过 5 秒
- 指数退避 + 错误分类是重试策略的基础
- 令牌桶 + 请求队列是限流防护的标配
- 多模型 Fallback 让服务在单个模型故障时仍然可用
- Token 预算管理省下的每一分钱都是纯利润
- 成本感知路由:简单任务用小模型,复杂任务自动升级
本文代码在生产环境中运行(志趣论坛 zhiqu.ac 的 AI 功能基于这套架构)。把这套代码集成到你的项目中,基本可以覆盖 90% 的 AI API 调用场景。有问题欢迎来社区讨论。
更多推荐



所有评论(0)