深入拆解 Claude Code 源码(三):1300 行代码揭秘 AI 对话引擎的“心脏“是如何跳动的
深入拆解 Claude Code 源码(三):1300 行代码揭秘 AI 对话引擎的"心脏"是如何跳动的
系列:深入拆解 Claude Code 源码 | 第 3 篇 / 共 8 篇
关键词:Claude Code, QueryEngine, 流式 API, 消息循环, 工具调用, Token 预算, 上下文管理, 成本追踪, 错误恢复
开篇:一个看似简单的问题
当你在 Claude Code 里问一句 “帮我修复这个 bug”,表面上看就是一问一答。但实际上,背后有一个精密的"对话引擎"在协调一切:
- 用户的输入要和之前 50 轮对话拼在一起发给 API
- API 返回的流式数据要实时渲染到终端
- 如果 AI 决定用 Bash 跑个命令,引擎要暂停流式处理、执行命令、把结果追加回去、再调 API
- Token 快用完了?自动压缩历史对话
- API 返回 429?自动重试,指数退避
这个引擎就是 QueryEngine.ts – 整个 Claude Code 的"心脏",约 1300 行代码。
一、对话引擎的职责
+-------------------------------------------+
| QueryEngine.ts |
| |
| +---------+ +----------+ +--------+ |
| | 消息历史 | | Token | | 工具 | |
| | 管理 | | 预算计算 | | 调度 | |
| +----+----+ +----+-----+ +---+----+ |
| | | | |
| +------------+------------+ |
| | |
| +-----+-----+ |
| | query() | |
| | API 调用 | |
| +-----+-----+ |
| | |
| +-----+-----+ |
| | 流式处理 | |
| | 重试逻辑 | |
| +-----------+ |
+-------------------------------------------+
二、消息类型系统
Claude Code 定义了丰富的消息类型:
// src/types/message.ts
type UserMessage = {
role: 'user'
content: ContentBlock[] // 文本 + 图片 + 工具结果
uuid: string; timestamp: number
}
type AssistantMessage = {
role: 'assistant'
content: ContentBlock[] // 文本 + tool_use
uuid: string; model: string; costUSD: number; durationMs: number
}
type SystemMessage = { role: 'system'; content: string }
type ProgressMessage = {
type: 'progress'; toolName: string
status: 'running' | 'completed' | 'error'; data: any
}
关键设计:ContentBlock 是联合类型,可以是 TextBlock、ImageBlock、ToolUseBlock、ToolResultBlock。这意味着一次 AI 回复可以混合文本和工具调用:
AI 回复: "让我先看看这个文件" + [ToolUse: Read file.ts] + "然后我来修改" + [ToolUse: Edit file.ts]
三、query():API 调用的核心
src/query.ts 是与 Anthropic Messages API 通信的最底层,约 1730 行。它不是简单的 fetch 包装器,而是一个完整的多轮对话循环,支持流式处理、自动压缩、token 预算管理、错误恢复。
3.1 入口与循环结构
query() 本身是 AsyncGenerator,委托给内部的 queryLoop():
// src/query.ts
export async function* query(params: QueryParams): AsyncGenerator<StreamEvent | Message, Terminal> {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
for (const uuid of consumedCommandUuids) notifyCommandLifecycle(uuid, 'completed')
return terminal
}
queryLoop 内部维护可变状态,跨迭代传递:
type State = {
messages: Message[]
toolUseContext: ToolUseContext
autoCompactTracking: AutoCompactTrackingState | undefined
maxOutputTokensRecoveryCount: number // max_tokens 恢复尝试次数
hasAttemptedReactiveCompact: boolean // 是否已尝试响应式压缩
maxOutputTokensOverride: number | undefined
turnCount: number // 当前轮次计数
transition: Continue | undefined // 上一次迭代为什么继续
}
const budgetTracker = feature('TOKEN_BUDGET') ? createBudgetTracker() : null
3.2 请求构造与 API 调用
每一迭代的核心是调用 deps.callModel():
// src/services/api/claude.ts - 请求参数构建
const requestParams: BetaMessageStreamParams = {
model: normalizeModelStringForAPI(options.model),
max_tokens: getMaxOutputTokensForModel(options.model, options.maxOutputTokensOverride),
system: systemPromptParts, // 系统提示词(含 cache_control 标记)
messages: normalizedMessages, // 规范化后的消息历史
tools: toolSchemas, // 工具 JSON Schema 列表
stream: true, // 始终流式
tool_choice: options.toolChoice,
metadata: { user_id: getOrCreateUserID() },
...(thinkingConfig && { thinking: thinkingConfig }),
...(taskBudget && { output_config: { task_budget: taskBudget } }),
}
3.3 流式事件处理
API 返回 Server-Sent Events 流,每种事件都有对应处理:
switch (message.type) {
case 'content_block_start':
if (block.type === 'tool_use') { toolUseBlocks.push(block); needsFollowUp = true }
break
case 'content_block_delta':
if (delta.type === 'text_delta') yield { type: 'text_delta', text: delta.text }
if (delta.type === 'input_json_delta') streamingToolExecutor?.updateInput(blockIndex, delta.partial_json)
break
case 'content_block_stop':
if (streamingToolExecutor && currentBlock?.type === 'tool_use')
streamingToolExecutor.onBlockComplete(blockIndex)
break
case 'message_delta':
lastStopReason = message.delta.stop_reason // 'end_turn' | 'tool_use' | 'max_tokens'
currentMessageUsage = updateUsage(currentMessageUsage, message.usage)
break
}
3.4 错误分类与重试
getAssistantMessageFromError() 是一个约 500 行的巨型函数,处理十几种错误场景:
// src/services/api/errors.ts - 核心错误分类(简化展示关键分支)
export function getAssistantMessageFromError(error: unknown, model: string): AssistantMessage {
if (error instanceof APIConnectionTimeoutError)
return createAssistantAPIErrorMessage({ content: API_TIMEOUT_ERROR_MESSAGE, error: 'unknown' })
if (error instanceof ImageSizeError)
return createAssistantAPIErrorMessage({ content: getImageTooLargeErrorMessage() })
if (error instanceof APIError && error.status === 429) {
// 解析新版限流头 (anthropic-ratelimit-unified-*)
const rateLimitType = error.headers?.get('anthropic-ratelimit-unified-representative-claim')
if (rateLimitType) return createAssistantAPIErrorMessage({ content: getRateLimitErrorMessage(limits, model), error: 'rate_limit' })
}
if (error.message.toLowerCase().includes('prompt is too long'))
return createAssistantAPIErrorMessage({ content: PROMPT_TOO_LONG_ERROR_MESSAGE, error: 'invalid_request' })
if (error.message.includes('`tool_use` ids were found without `tool_result`'))
return createAssistantAPIErrorMessage({ content: 'API Error: 400 due to tool use concurrency issues.', error: 'invalid_request' })
if (error instanceof APIError && (error.status === 401 || error.status === 403))
return createAssistantAPIErrorMessage({ error: 'authentication_failed', content: 'Please run /login' })
if (error instanceof APIError && error.status === 404)
return createAssistantAPIErrorMessage({ content: `The model ${model} is not available.`, error: 'invalid_request' })
return createAssistantAPIErrorMessage({ content: `${API_ERROR_MESSAGE_PREFIX}: ${error.message}`, error: 'unknown' })
}
withRetry() 是精心设计的重试包装器:
// src/services/api/withRetry.ts
const DEFAULT_MAX_RETRIES = 10; const MAX_529_RETRIES = 3; const BASE_DELAY_MS = 500
export async function* withRetry<T>(getClient, operation, options): AsyncGenerator<SystemAPIErrorMessage, T> {
let consecutive529Errors = 0
for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
if (options.signal?.aborted) throw new APIUserAbortError()
try {
// 认证错误时刷新客户端(401/OAuth 撤销/Bedrock 凭证过期)
if (client === null || (lastError instanceof APIError && lastError.status === 401)) {
client = await getClient()
}
return await operation(client, attempt, retryContext)
} catch (error) {
if (is529Error(error)) {
consecutive529Errors++
if (consecutive529Errors >= MAX_529_RETRIES && options.fallbackModel)
throw new FallbackTriggeredError(options.model, options.fallbackModel) // Opus -> Sonnet 降级
}
// max_tokens 上下文溢出 -> 调整 max_tokens 重试
const overflowData = parseMaxTokensContextOverflowError(error)
if (overflowData) {
retryContext.maxTokensOverride = Math.max(FLOOR_OUTPUT_TOKENS, overflowData.contextLimit - overflowData.inputTokens - 1000)
continue
}
// 指数退避:delay = min(BASE_DELAY * 2^attempt, maxDelay) + jitter
const delayMs = getRetryDelay(attempt, getRetryAfter(error))
await sleep(delayMs, options.signal)
if (attempt > maxRetries) throw new CannotRetryError(error, retryContext)
}
}
}
错误分类还有专门的 classifyAPIError() 用于 Datadog 指标上报,将每种错误映射为简洁的标签(如 'rate_limit'、'prompt_too_long'、'tool_use_mismatch'),方便监控告警。
重试策略分层设计:
| 错误类型 | 策略 | 最大重试 |
|---|---|---|
| 429 Rate Limit | 指数退避 + Retry-After | 10 次 |
| 529 Overloaded | 退避 + 模型降级 | 3 次后降级 |
| 401 Auth Error | 刷新 token + 重试 | 3 次 |
| Prompt Too Long | 压缩后重试 | 1 次 |
| Max Output Tokens | 扩大 token 限制重试 | 3 次 |
| 连接超时 | 指数退避 | 10 次 |
| 400/403 | 不重试 | - |
四、QueryEngine:对话循环的主控
src/QueryEngine.ts 约 1300 行,管理一次对话会话的完整生命周期。
4.1 核心依赖
// src/QueryEngine.ts - 关键依赖
import { accumulateUsage, updateUsage } from 'src/services/api/claude.js'
import { getModelUsage, getTotalAPIDuration, getTotalCost } from './cost-tracker.js'
import { query } from './query.js'
import { categorizeRetryableAPIError } from './services/api/errors.js'
import { createUserMessage, createUserInterruptionMessage, normalizeMessagesForAPI,
createToolUseSummaryMessage, stripSignatureBlocks } from './utils/messages.js'
import { tokenCountWithEstimation, doesMostRecentAssistantMessageExceed200k } from './utils/tokens.js'
import { StreamingToolExecutor } from './services/tools/StreamingToolExecutor.js'
import { runTools } from './services/tools/toolOrchestration.js'
import { applyToolResultBudget } from './utils/toolResultStorage.js'
import { executePostSamplingHooks } from './utils/hooks/postSamplingHooks.js'
4.2 配置与初始化
export type QueryEngineConfig = {
cwd: string; tools: Tools; commands: Command[]
mcpClients: MCPServerConnection[]; agents: AgentDefinition[]
canUseTool: CanUseToolFn // 权限检查函数
getAppState: () => AppState; setAppState: (f: (prev: AppState) => AppState) => void
fallbackModel?: string; thinkingConfig?: ThinkingConfig
maxTurns?: number; maxBudgetUsd?: number; taskBudget?: { total: number }
// ... 更多选项
}
export class QueryEngine {
private mutableMessages: Message[] // 可变消息历史
private abortController: AbortController
private totalUsage: NonNullableUsage // 累积 token 用量
private readFileState: FileStateCache // 文件状态缓存
constructor(config: QueryEngineConfig) {
this.mutableMessages = config.initialMessages ?? []
this.abortController = config.abortController ?? createAbortController()
this.totalUsage = EMPTY_USAGE
}
}
4.3 submitMessage:一次完整的对话轮次
submitMessage() 是核心方法,每次用户发消息都会调用,通过 yield 实时输出:
async *submitMessage(prompt, options): AsyncGenerator<SDKMessage> {
// 阶段 1: 构建系统提示词
const { defaultSystemPrompt, userContext, systemContext } = await fetchSystemPromptParts({
tools, mainLoopModel, additionalWorkingDirectories, mcpClients, customSystemPrompt,
})
const systemPrompt = asSystemPrompt([...defaultSystemPrompt, ...(memoryPrompt ? [memoryPrompt] : []), ...(appendSystemPrompt ? [appendSystemPrompt] : [])])
// 阶段 2: 处理用户输入
const { messages: messagesFromUserInput, shouldQuery } = await processUserInput({
input: prompt, mode: 'prompt', context: processUserInputContext, messages: this.mutableMessages,
})
this.mutableMessages.push(...messagesFromUserInput)
// 阶段 3: 调用 query() 进入对话循环
for await (const message of query({ messages, systemPrompt, userContext, systemContext,
canUseTool: wrappedCanUseTool, toolUseContext, fallbackModel, maxTurns, taskBudget,
})) {
switch (message.type) {
case 'assistant': case 'user': case 'progress':
this.mutableMessages.push(message); yield* normalizeMessage(message); break
case 'stream_event':
if (message.event.type === 'message_start') currentMessageUsage = updateUsage(EMPTY_USAGE, message.event.message.usage)
if (message.event.type === 'message_delta') currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage)
yield message; break
}
}
// 阶段 4: 输出最终结果
yield { type: 'result', subtype: 'success', duration_ms: Date.now() - startTime,
total_cost_usd: getTotalCost(), usage: this.totalUsage, modelUsage: getModelUsage() }
}
4.4 Token 预算 – 六层防御体系
Token 预算是对话引擎的关键机制。src/utils/context.ts 定义了基础参数:
// src/utils/context.ts
export const MODEL_CONTEXT_WINDOW_DEFAULT = 200_000
export const CAPPED_DEFAULT_MAX_TOKENS = 8_000 // 默认输出上限
export const ESCALATED_MAX_TOKENS = 64_000 // 重试时升级上限
export function getContextWindowForModel(model: string, betas?: string[]): number {
if (process.env.CLAUDE_CODE_MAX_CONTEXT_TOKENS) return parseInt(process.env.CLAUDE_CODE_MAX_CONTEXT_TOKENS, 10)
if (has1mContext(model)) return 1_000_000 // [1m] 后缀 -> 100 万 token
const cap = getModelCapability(model)
if (cap?.max_input_tokens >= 100_000) return cap.max_input_tokens
return MODEL_CONTEXT_WINDOW_DEFAULT // 默认 200k
}
query.ts 中的 token 预算检查是六层防御:
// 层 1: 工具结果大小预算(防止单个工具输出撑爆上下文)
messagesForQuery = await applyToolResultBudget(messagesForQuery, contentReplacementState, ...)
// 层 2: Snip 压缩(历史消息裁剪)
if (feature('HISTORY_SNIP')) messagesForQuery = snipModule.snipCompactIfNeeded(messagesForQuery).messages
// 层 3: Microcompact(工具结果压缩)
messagesForQuery = (await deps.microcompact(messagesForQuery, toolUseContext, querySource)).messages
// 层 4: Context Collapse(上下文折叠)
if (feature('CONTEXT_COLLAPSE')) messagesForQuery = (await contextCollapse.applyCollapsesIfNeeded(...)).messages
// 层 5: Auto-compact(自动压缩 - 最终防线)
const { compactionResult } = await deps.autocompact(messagesForQuery, toolUseContext, { systemPrompt, userContext, systemContext }, ...)
// 层 6: 阻塞限制(auto-compact 关闭时的安全网)
if (!compactionResult) {
const { isAtBlockingLimit } = calculateTokenWarningState(tokenCountWithEstimation(messagesForQuery), model)
if (isAtBlockingLimit) return { reason: 'blocking_limit' }
}
这个 6 层防御的设计思路是渐进式压缩:从最小侵入的工具结果裁剪(层 1),到历史消息裁剪(层 2),到工具结果压缩(层 3),到上下文折叠(层 4),再到全量自动压缩(层 5),最后是硬性阻塞(层 6)。每一层都尽可能保留更多信息,只在必要时才升级到更激进的策略。
4.5 自动压缩与 MAX_TURNS 保护
自动压缩触发后的处理:
if (compactionResult) {
logEvent('tengu_auto_compact_succeeded', {
preCompactTokenCount, postCompactTokenCount, truePostCompactTokenCount,
compactionInputTokens: compactionUsage?.input_tokens,
})
tracking = { compacted: true, turnId: deps.uuid(), turnCounter: 0, consecutiveFailures: 0 }
const postCompactMessages = buildPostCompactMessages(compactionResult)
for (const message of postCompactMessages) yield message
messagesForQuery = postCompactMessages
}
无限循环保护:
// 轮次限制
if (maxTurns !== undefined && state.turnCount > maxTurns) return { reason: 'max_turns_reached' }
// max_output_tokens 恢复次数限制
const MAX_OUTPUT_TOKENS_RECOVERY_LIMIT = 3
if (state.maxOutputTokensRecoveryCount > MAX_OUTPUT_TOKENS_RECOVERY_LIMIT)
return { reason: 'max_output_tokens_recovery_exhausted' }
五、工具执行流水线
5.1 传统 7 步流水线
async executeTool(toolUse: ToolUseBlock): Promise<ToolResult> {
const tool = this.findTool(toolUse.name) // 1. 查找工具
const validation = await tool.validateInput(toolUse.input) // 2. 输入验证
if (!validation.valid) return { error: validation.error }
const permission = await this.checkPermissions(tool, input) // 3. 权限检查
if (permission === 'ask') { if (!await this.askUser(tool, input)) return { error: 'User rejected' } }
const preHook = await runPreToolUseHooks(toolUse.name, input) // 4. PreToolUse Hook
if (preHook.blockingError) return { error: preHook.blockingError }
const result = await tool.call(toolUse.input, context) // 5. 执行工具
await runPostToolUseHooks(toolUse.name, input, result) // 6. PostToolUse Hook
return result // 7. 返回结果
}
5.2 流式工具预执行(StreamingToolExecutor)
优化创新:在 API 还在流式返回工具输入 JSON 时,就开始验证和准备:
// 当 content_block_start 到达时,创建执行器
streamingToolExecutor = new StreamingToolExecutor(tools, canUseTool, toolUseContext)
// 当 input_json_delta 到达时,增量更新输入
streamingToolExecutor?.updateInput(blockIndex, delta.partial_json)
// 当 content_block_stop 到达时,标记块完成并可开始预执行
streamingToolExecutor.onBlockComplete(blockIndex)
// 流式结束后,获取所有工具结果
const toolResults = await streamingToolExecutor.getResults()
5.3 工具结果消息构造
工具执行完成后,结果封装成标准 user 消息:
const toolResultMessage = createUserMessage({
content: [{
type: 'tool_result',
tool_use_id: toolUse.id,
content: result.output, // 工具输出文本
is_error: result.isError, // 是否为错误
}],
toolUseResult: result.output,
sourceToolAssistantUUID: assistantMessage.uuid,
})
5.4 七种权限模式
权限检查的核心在 useCanUseTool hook 中,它返回 'allow'、'deny' 或 'ask' 三种决策:
type CanUseToolFn = async (tool: Tool, input: Record<string, unknown>,
toolUseContext: ToolUseContext, assistantMessage: AssistantMessage, toolUseID: string,
forceDecision?: 'allow' | 'deny',
) => Promise<{ behavior: 'allow' | 'deny' | 'ask' }>
七种权限模式的行为差异:
| 模式 | 行为 | 典型场景 |
|---|---|---|
default |
危险操作需要用户确认 | 日常交互 |
plan |
只允许只读操作,写操作需审批 | 代码审查 |
acceptEdits |
文件编辑自动允许,Bash 仍需确认 | 快速开发 |
bypassPermissions |
跳过所有权限检查 | CI/CD 自动化 |
dontAsk |
自动拒绝需要确认的操作 | 只读分析 |
auto |
AI 分类器自动判断(yoloClassifier.ts) | 智能模式 |
bubble |
权限请求冒泡到父 Agent | 多 Agent 协作 |
5.5 PreToolUse / PostToolUse Hook 系统
Hook 系统允许在工具执行前后注入自定义逻辑,如审计日志、安全检查、结果后处理等。PreToolUse Hook 可以阻止工具执行(返回 blockingError),PostToolUse Hook 则用于记录和通知:
// PreToolUse: 可阻止执行
const preHook = await runPreToolUseHooks(toolUse.name, toolUse.input, context)
if (preHook.blockingError) return { error: preHook.blockingError }
// PostToolUse: 审计与通知
await runPostToolUseHooks(toolUse.name, toolUse.input, result, context)
六、上下文管理深度解析
src/context.ts(190 行)负责在会话生命周期内收集和缓存上下文信息。三个核心函数都使用 lodash/memoize 缓存,整个会话只执行一次。
6.1 getSystemContext() – 系统上下文
export const getSystemContext = memoize(async (): Promise<{ [k: string]: string }> => {
const gitStatus = isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || !shouldIncludeGitInstructions()
? null : await getGitStatus()
const injection = feature('BREAK_CACHE_COMMAND') ? getSystemPromptInjection() : null
return {
...(gitStatus && { gitStatus }),
...(injection ? { cacheBreaker: `[CACHE_BREAKER: ${injection}]` } : {}),
}
})
6.2 getUserContext() – 用户上下文
export const getUserContext = memoize(async (): Promise<{ [k: string]: string }> => {
const shouldDisableClaudeMd = isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_CLAUDE_MDS) ||
(isBareMode() && getAdditionalDirectoriesForClaudeMd().length === 0)
const claudeMd = shouldDisableClaudeMd ? null
: getClaudeMds(filterInjectedMemoryFiles(await getMemoryFiles()))
setCachedClaudeMdContent(claudeMd || null) // 缓存供 auto-mode 分类器使用
return { ...(claudeMd && { claudeMd }), currentDate: `Today's date is ${getLocalISODate()}.` }
})
6.3 getGitStatus() – 并行 Git 信息收集
性能优化的典范 – 5 个 git 命令并行执行:
const MAX_STATUS_CHARS = 2000
export const getGitStatus = memoize(async (): Promise<string | null> => {
if (process.env.NODE_ENV === 'test') return null
const isGit = await getIsGit()
if (!isGit) return null
try {
// 5 个 git 命令并行!
const [branch, mainBranch, status, log, userName] = await Promise.all([
getBranch(), getDefaultBranch(),
execFileNoThrow(gitExe(), ['--no-optional-locks', 'status', '--short']).then(r => r.stdout.trim()),
execFileNoThrow(gitExe(), ['--no-optional-locks', 'log', '--oneline', '-n', '5']).then(r => r.stdout.trim()),
execFileNoThrow(gitExe(), ['config', 'user.name']).then(r => r.stdout.trim()),
])
// 状态超过 2000 字符时截断
const truncatedStatus = status.length > MAX_STATUS_CHARS
? status.substring(0, MAX_STATUS_CHARS) + '\n... (truncated)'
: status
return [
`This is the git status at the start of the conversation.`, `Current branch: ${branch}`,
`Main branch: ${mainBranch}`, ...(userName ? [`Git user: ${userName}`] : []),
`Status:\n${truncatedStatus || '(clean)'}`, `Recent commits:\n${log}`,
].join('\n\n')
} catch (error) { logError(error); return null }
})
6.4 系统提示词注入机制
系统提示词通过 fetchSystemPromptParts() 组装,收集多个来源的上下文:
const { defaultSystemPrompt, userContext, systemContext } = await fetchSystemPromptParts({
tools, // 可用工具列表 -> 生成工具描述
mainLoopModel: model, // 模型名称 -> 调整提示词策略
additionalWorkingDirectories, // 额外工作目录 -> CLAUDE.md 搜索路径
mcpClients, // MCP 服务器 -> 动态工具描述
customSystemPrompt, // 用户自定义系统提示词
})
const fullSystemPrompt = asSystemPrompt(appendSystemContext(systemPrompt, systemContext))
messagesForQuery = prependUserContext(messagesForQuery, userContext) // 注入用户上下文到消息序列
七、成本追踪系统
src/cost-tracker.ts(324 行)是 Claude Code 的"记账本",精确追踪每次 API 调用的成本。
7.1 核心数据结构
type StoredCostState = {
totalCostUSD: number // 总花费(美元)
totalAPIDuration: number // API 调用总耗时
totalAPIDurationWithoutRetries: number // 不含重试的 API 耗时
totalToolDuration: number // 工具执行总耗时
totalLinesAdded: number // 代码新增行数
totalLinesRemoved: number // 代码删除行数
modelUsage: { [modelName: string]: ModelUsage } | undefined // 按模型用量
}
7.2 accumulateUsage() – Token 用量累积
每次 API 调用返回后,4 种 token 类型被精确累加:
// src/services/api/claude.ts
export function accumulateUsage(total: Usage, delta: Usage): Usage {
return {
input_tokens: total.input_tokens + delta.input_tokens,
output_tokens: total.output_tokens + delta.output_tokens,
cache_creation_input_tokens: (total.cache_creation_input_tokens ?? 0) + (delta.cache_creation_input_tokens ?? 0),
cache_read_input_tokens: (total.cache_read_input_tokens ?? 0) + (delta.cache_read_input_tokens ?? 0),
}
}
四种 token 类型说明:
input_tokens– 输入 token(用户消息 + 系统提示词 + 工具描述)output_tokens– 输出 token(AI 回复)cache_creation_input_tokens– 缓存创建 token(首次写入 prompt cache)cache_read_input_tokens– 缓存读取 token(命中 prompt cache,成本仅为创建的 10%)
这 4 种 token 的区分对于成本优化至关重要。Claude Code 的系统提示词和工具描述通常占几万个 token,通过 prompt cache 机制,后续请求只需支付 cache_read 的低成本(约为正常 input 的 10%),大幅降低了多轮对话的成本。
7.3 addToTotalSessionCost() – 会话成本累加
export function addToTotalSessionCost(cost: number, usage: Usage, model: string): number {
const modelUsage = addToTotalModelUsage(cost, usage, model)
addToTotalCostState(cost, modelUsage, model)
// OpenTelemetry 指标上报
getCostCounter()?.add(cost, { model })
getTokenCounter()?.add(usage.input_tokens, { model, type: 'input' })
getTokenCounter()?.add(usage.output_tokens, { model, type: 'output' })
getTokenCounter()?.add(usage.cache_read_input_tokens ?? 0, { model, type: 'cacheRead' })
getTokenCounter()?.add(usage.cache_creation_input_tokens ?? 0, { model, type: 'cacheCreation' })
// 递归计算 advisor 子查询的成本
let totalCost = cost
for (const advisorUsage of getAdvisorUsage(usage)) {
const advisorCost = calculateUSDCost(advisorUsage.model, advisorUsage)
totalCost += addToTotalSessionCost(advisorCost, advisorUsage, advisorUsage.model)
}
return totalCost
}
7.4 会话成本持久化与 /cost 命令
成本数据在切换会话前保存到项目配置:
export function saveCurrentSessionCosts(fpsMetrics?: FpsMetrics): void {
saveCurrentProjectConfig(current => ({
...current,
lastCost: getTotalCostUSD(),
lastAPIDuration: getTotalAPIDuration(),
lastTotalInputTokens: getTotalInputTokens(),
lastTotalOutputTokens: getTotalOutputTokens(),
lastTotalCacheCreationInputTokens: getTotalCacheCreationInputTokens(),
lastTotalCacheReadInputTokens: getTotalCacheReadInputTokens(),
lastModelUsage: Object.fromEntries(
Object.entries(getModelUsage()).map(([model, usage]) => [model, {
inputTokens: usage.inputTokens, outputTokens: usage.outputTokens,
cacheReadInputTokens: usage.cacheReadInputTokens, costUSD: usage.costUSD,
}])
),
lastSessionId: getSessionId(),
}))
}
/cost 命令输出格式:
export function formatTotalCost(): string {
return chalk.dim(
`Total cost: ${formatCost(getTotalCostUSD())}\n` +
`Total duration (API): ${formatDuration(getTotalAPIDuration())}\n` +
`Total duration (wall): ${formatDuration(getTotalDuration())}\n` +
`Total code changes: ${getTotalLinesAdded()} lines added, ${getTotalLinesRemoved()} lines removed\n` +
formatModelUsage(), // 按模型短名称聚合:claude-sonnet-4: 1,234,567 input, 45,678 output ($0.45)
)
}
八、错误处理与恢复
8.1 API 错误恢复路径
API 调用失败
|
+-> 超时/连接错误 -> withRetry 指数退避 -> 重试最多 10 次
|
+-> 429 Rate Limit -> 解析 Retry-After 头 -> 等待后重试
|
+-> 529 Overloaded -> 前台: 退避重试 (最多 3 次) | 后台: 立即放弃
| -> 连续 3 次: 触发模型降级 (Opus -> Sonnet)
|
+-> Prompt Too Long -> reactiveCompact 压缩后重试
|
+-> Max Output Tokens -> 扩大 max_tokens 重试 (从 8k 升级到 64k,最多 3 次)
|
+-> 401 Auth Error -> 刷新 OAuth token -> 重建客户端 -> 重试
|
+-> 400/403 -> 不重试,显示具体错误信息
8.2 工具执行错误处理
工具执行失败时,错误信息封装成 tool_result 消息,让 AI 有机会自我修正:
if (result.isError) {
const errorMessage = createUserMessage({
content: [{ type: 'tool_result', tool_use_id: toolUse.id, content: `Error: ${result.error}`, is_error: true }],
})
messages.push(errorMessage) // AI 在下一轮看到错误,可能尝试不同方法
}
8.3 中断恢复
用户按 Ctrl+C 时,yieldMissingToolResultBlocks() 为未完成的工具调用生成错误结果,防止 tool_use/tool_result 不匹配导致 API 400:
function* yieldMissingToolResultBlocks(assistantMessages: AssistantMessage[], errorMessage: string) {
for (const assistantMessage of assistantMessages) {
const toolUseBlocks = assistantMessage.message.content.filter(c => c.type === 'tool_use')
for (const toolUse of toolUseBlocks) {
yield createUserMessage({
content: [{ type: 'tool_result', content: errorMessage, is_error: true, tool_use_id: toolUse.id }],
sourceToolAssistantUUID: assistantMessage.uuid,
})
}
}
}
九、设计亮点总结
| 设计决策 | 原因 |
|---|---|
| AsyncGenerator 循环 | 流式输出,实时渲染,不阻塞 |
| 6 层 token 预算防御 | snip -> microcompact -> collapse -> autocompact -> blocking -> recovery |
| 流式工具预执行 | 在 API 流式返回时就开始验证工具,减少延迟 |
| 7 种权限模式 | 从只读到全自动,覆盖所有使用场景 |
| 指数退避 + 529 降级 | 优雅处理 API 限流,3 次 529 后自动降级模型 |
| 5 个 git 命令并行 | Promise.all 并行收集 git 状态,避免串行阻塞 |
| memoize 上下文缓存 | 系统/用户上下文整个会话只计算一次 |
| 4 种 token 分类追踪 | input/output/cache_read/cache_creation,精确到分 |
| tool_use/tool_result 配对 | 中断时自动生成错误结果,防止 API 400 |
| 500 行错误分类器 | 覆盖 15+ 种 API 错误场景,每种都有定制提示 |
下篇预告
第四篇:77 个斜杠命令的设计艺术
Claude Code 有 77 个斜杠命令,从
/commit到/review,从/mcp到/vim。但你知道吗?这些命令其实分三种完全不同的类型:一种生成 AI 提示词,一种直接执行本地逻辑,还有一种渲染 React 组件。更有趣的是,有些命令正在从内置迁移到插件系统。下一篇,我们来看看命令系统的设计模式和演变路径。
标签:
Claude CodeQueryEngine流式API消息循环工具调用Token预算成本追踪错误恢复上下文管理源码分析TypeScript
更多推荐


所有评论(0)