深入拆解 Claude Code 源码(三):1300 行代码揭秘 AI 对话引擎的"心脏"是如何跳动的

系列:深入拆解 Claude Code 源码 | 第 3 篇 / 共 8 篇
关键词:Claude Code, QueryEngine, 流式 API, 消息循环, 工具调用, Token 预算, 上下文管理, 成本追踪, 错误恢复


开篇:一个看似简单的问题

当你在 Claude Code 里问一句 “帮我修复这个 bug”,表面上看就是一问一答。但实际上,背后有一个精密的"对话引擎"在协调一切:

  • 用户的输入要和之前 50 轮对话拼在一起发给 API
  • API 返回的流式数据要实时渲染到终端
  • 如果 AI 决定用 Bash 跑个命令,引擎要暂停流式处理、执行命令、把结果追加回去、再调 API
  • Token 快用完了?自动压缩历史对话
  • API 返回 429?自动重试,指数退避

这个引擎就是 QueryEngine.ts – 整个 Claude Code 的"心脏",约 1300 行代码。


一、对话引擎的职责

+-------------------------------------------+
|            QueryEngine.ts                 |
|                                           |
|  +---------+  +----------+  +--------+   |
|  | 消息历史 |  | Token    |  | 工具   |   |
|  | 管理    |  | 预算计算 |  | 调度   |   |
|  +----+----+  +----+-----+  +---+----+   |
|       |            |            |         |
|       +------------+------------+         |
|                    |                      |
|              +-----+-----+               |
|              |  query()  |               |
|              |  API 调用  |               |
|              +-----+-----+               |
|                    |                      |
|              +-----+-----+               |
|              | 流式处理   |               |
|              | 重试逻辑   |               |
|              +-----------+               |
+-------------------------------------------+

二、消息类型系统

Claude Code 定义了丰富的消息类型:

// src/types/message.ts
type UserMessage = {
  role: 'user'
  content: ContentBlock[]   // 文本 + 图片 + 工具结果
  uuid: string; timestamp: number
}
type AssistantMessage = {
  role: 'assistant'
  content: ContentBlock[]   // 文本 + tool_use
  uuid: string; model: string; costUSD: number; durationMs: number
}
type SystemMessage = { role: 'system'; content: string }
type ProgressMessage = {
  type: 'progress'; toolName: string
  status: 'running' | 'completed' | 'error'; data: any
}

关键设计:ContentBlock 是联合类型,可以是 TextBlockImageBlockToolUseBlockToolResultBlock。这意味着一次 AI 回复可以混合文本和工具调用

AI 回复: "让我先看看这个文件" + [ToolUse: Read file.ts] + "然后我来修改" + [ToolUse: Edit file.ts]

三、query():API 调用的核心

src/query.ts 是与 Anthropic Messages API 通信的最底层,约 1730 行。它不是简单的 fetch 包装器,而是一个完整的多轮对话循环,支持流式处理、自动压缩、token 预算管理、错误恢复。

3.1 入口与循环结构

query() 本身是 AsyncGenerator,委托给内部的 queryLoop()

// src/query.ts
export async function* query(params: QueryParams): AsyncGenerator<StreamEvent | Message, Terminal> {
  const consumedCommandUuids: string[] = []
  const terminal = yield* queryLoop(params, consumedCommandUuids)
  for (const uuid of consumedCommandUuids) notifyCommandLifecycle(uuid, 'completed')
  return terminal
}

queryLoop 内部维护可变状态,跨迭代传递:

type State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number    // max_tokens 恢复尝试次数
  hasAttemptedReactiveCompact: boolean    // 是否已尝试响应式压缩
  maxOutputTokensOverride: number | undefined
  turnCount: number                       // 当前轮次计数
  transition: Continue | undefined        // 上一次迭代为什么继续
}
const budgetTracker = feature('TOKEN_BUDGET') ? createBudgetTracker() : null

3.2 请求构造与 API 调用

每一迭代的核心是调用 deps.callModel()

// src/services/api/claude.ts - 请求参数构建
const requestParams: BetaMessageStreamParams = {
  model: normalizeModelStringForAPI(options.model),
  max_tokens: getMaxOutputTokensForModel(options.model, options.maxOutputTokensOverride),
  system: systemPromptParts,        // 系统提示词(含 cache_control 标记)
  messages: normalizedMessages,      // 规范化后的消息历史
  tools: toolSchemas,                // 工具 JSON Schema 列表
  stream: true,                      // 始终流式
  tool_choice: options.toolChoice,
  metadata: { user_id: getOrCreateUserID() },
  ...(thinkingConfig && { thinking: thinkingConfig }),
  ...(taskBudget && { output_config: { task_budget: taskBudget } }),
}

3.3 流式事件处理

API 返回 Server-Sent Events 流,每种事件都有对应处理:

switch (message.type) {
  case 'content_block_start':
    if (block.type === 'tool_use') { toolUseBlocks.push(block); needsFollowUp = true }
    break
  case 'content_block_delta':
    if (delta.type === 'text_delta') yield { type: 'text_delta', text: delta.text }
    if (delta.type === 'input_json_delta') streamingToolExecutor?.updateInput(blockIndex, delta.partial_json)
    break
  case 'content_block_stop':
    if (streamingToolExecutor && currentBlock?.type === 'tool_use')
      streamingToolExecutor.onBlockComplete(blockIndex)
    break
  case 'message_delta':
    lastStopReason = message.delta.stop_reason  // 'end_turn' | 'tool_use' | 'max_tokens'
    currentMessageUsage = updateUsage(currentMessageUsage, message.usage)
    break
}

3.4 错误分类与重试

getAssistantMessageFromError() 是一个约 500 行的巨型函数,处理十几种错误场景:

// src/services/api/errors.ts - 核心错误分类(简化展示关键分支)
export function getAssistantMessageFromError(error: unknown, model: string): AssistantMessage {
  if (error instanceof APIConnectionTimeoutError)
    return createAssistantAPIErrorMessage({ content: API_TIMEOUT_ERROR_MESSAGE, error: 'unknown' })
  if (error instanceof ImageSizeError)
    return createAssistantAPIErrorMessage({ content: getImageTooLargeErrorMessage() })
  if (error instanceof APIError && error.status === 429) {
    // 解析新版限流头 (anthropic-ratelimit-unified-*)
    const rateLimitType = error.headers?.get('anthropic-ratelimit-unified-representative-claim')
    if (rateLimitType) return createAssistantAPIErrorMessage({ content: getRateLimitErrorMessage(limits, model), error: 'rate_limit' })
  }
  if (error.message.toLowerCase().includes('prompt is too long'))
    return createAssistantAPIErrorMessage({ content: PROMPT_TOO_LONG_ERROR_MESSAGE, error: 'invalid_request' })
  if (error.message.includes('`tool_use` ids were found without `tool_result`'))
    return createAssistantAPIErrorMessage({ content: 'API Error: 400 due to tool use concurrency issues.', error: 'invalid_request' })
  if (error instanceof APIError && (error.status === 401 || error.status === 403))
    return createAssistantAPIErrorMessage({ error: 'authentication_failed', content: 'Please run /login' })
  if (error instanceof APIError && error.status === 404)
    return createAssistantAPIErrorMessage({ content: `The model ${model} is not available.`, error: 'invalid_request' })
  return createAssistantAPIErrorMessage({ content: `${API_ERROR_MESSAGE_PREFIX}: ${error.message}`, error: 'unknown' })
}

withRetry() 是精心设计的重试包装器:

// src/services/api/withRetry.ts
const DEFAULT_MAX_RETRIES = 10; const MAX_529_RETRIES = 3; const BASE_DELAY_MS = 500

export async function* withRetry<T>(getClient, operation, options): AsyncGenerator<SystemAPIErrorMessage, T> {
  let consecutive529Errors = 0
  for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
    if (options.signal?.aborted) throw new APIUserAbortError()
    try {
      // 认证错误时刷新客户端(401/OAuth 撤销/Bedrock 凭证过期)
      if (client === null || (lastError instanceof APIError && lastError.status === 401)) {
        client = await getClient()
      }
      return await operation(client, attempt, retryContext)
    } catch (error) {
      if (is529Error(error)) {
        consecutive529Errors++
        if (consecutive529Errors >= MAX_529_RETRIES && options.fallbackModel)
          throw new FallbackTriggeredError(options.model, options.fallbackModel)  // Opus -> Sonnet 降级
      }
      // max_tokens 上下文溢出 -> 调整 max_tokens 重试
      const overflowData = parseMaxTokensContextOverflowError(error)
      if (overflowData) {
        retryContext.maxTokensOverride = Math.max(FLOOR_OUTPUT_TOKENS, overflowData.contextLimit - overflowData.inputTokens - 1000)
        continue
      }
      // 指数退避:delay = min(BASE_DELAY * 2^attempt, maxDelay) + jitter
      const delayMs = getRetryDelay(attempt, getRetryAfter(error))
      await sleep(delayMs, options.signal)
      if (attempt > maxRetries) throw new CannotRetryError(error, retryContext)
    }
  }
}

错误分类还有专门的 classifyAPIError() 用于 Datadog 指标上报,将每种错误映射为简洁的标签(如 'rate_limit''prompt_too_long''tool_use_mismatch'),方便监控告警。

重试策略分层设计:

错误类型 策略 最大重试
429 Rate Limit 指数退避 + Retry-After 10 次
529 Overloaded 退避 + 模型降级 3 次后降级
401 Auth Error 刷新 token + 重试 3 次
Prompt Too Long 压缩后重试 1 次
Max Output Tokens 扩大 token 限制重试 3 次
连接超时 指数退避 10 次
400/403 不重试 -

四、QueryEngine:对话循环的主控

src/QueryEngine.ts 约 1300 行,管理一次对话会话的完整生命周期。

4.1 核心依赖

// src/QueryEngine.ts - 关键依赖
import { accumulateUsage, updateUsage } from 'src/services/api/claude.js'
import { getModelUsage, getTotalAPIDuration, getTotalCost } from './cost-tracker.js'
import { query } from './query.js'
import { categorizeRetryableAPIError } from './services/api/errors.js'
import { createUserMessage, createUserInterruptionMessage, normalizeMessagesForAPI,
         createToolUseSummaryMessage, stripSignatureBlocks } from './utils/messages.js'
import { tokenCountWithEstimation, doesMostRecentAssistantMessageExceed200k } from './utils/tokens.js'
import { StreamingToolExecutor } from './services/tools/StreamingToolExecutor.js'
import { runTools } from './services/tools/toolOrchestration.js'
import { applyToolResultBudget } from './utils/toolResultStorage.js'
import { executePostSamplingHooks } from './utils/hooks/postSamplingHooks.js'

4.2 配置与初始化

export type QueryEngineConfig = {
  cwd: string; tools: Tools; commands: Command[]
  mcpClients: MCPServerConnection[]; agents: AgentDefinition[]
  canUseTool: CanUseToolFn           // 权限检查函数
  getAppState: () => AppState; setAppState: (f: (prev: AppState) => AppState) => void
  fallbackModel?: string; thinkingConfig?: ThinkingConfig
  maxTurns?: number; maxBudgetUsd?: number; taskBudget?: { total: number }
  // ... 更多选项
}

export class QueryEngine {
  private mutableMessages: Message[]       // 可变消息历史
  private abortController: AbortController
  private totalUsage: NonNullableUsage     // 累积 token 用量
  private readFileState: FileStateCache    // 文件状态缓存
  constructor(config: QueryEngineConfig) {
    this.mutableMessages = config.initialMessages ?? []
    this.abortController = config.abortController ?? createAbortController()
    this.totalUsage = EMPTY_USAGE
  }
}

4.3 submitMessage:一次完整的对话轮次

submitMessage() 是核心方法,每次用户发消息都会调用,通过 yield 实时输出:

async *submitMessage(prompt, options): AsyncGenerator<SDKMessage> {
  // 阶段 1: 构建系统提示词
  const { defaultSystemPrompt, userContext, systemContext } = await fetchSystemPromptParts({
    tools, mainLoopModel, additionalWorkingDirectories, mcpClients, customSystemPrompt,
  })
  const systemPrompt = asSystemPrompt([...defaultSystemPrompt, ...(memoryPrompt ? [memoryPrompt] : []), ...(appendSystemPrompt ? [appendSystemPrompt] : [])])

  // 阶段 2: 处理用户输入
  const { messages: messagesFromUserInput, shouldQuery } = await processUserInput({
    input: prompt, mode: 'prompt', context: processUserInputContext, messages: this.mutableMessages,
  })
  this.mutableMessages.push(...messagesFromUserInput)

  // 阶段 3: 调用 query() 进入对话循环
  for await (const message of query({ messages, systemPrompt, userContext, systemContext,
    canUseTool: wrappedCanUseTool, toolUseContext, fallbackModel, maxTurns, taskBudget,
  })) {
    switch (message.type) {
      case 'assistant': case 'user': case 'progress':
        this.mutableMessages.push(message); yield* normalizeMessage(message); break
      case 'stream_event':
        if (message.event.type === 'message_start') currentMessageUsage = updateUsage(EMPTY_USAGE, message.event.message.usage)
        if (message.event.type === 'message_delta') currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage)
        yield message; break
    }
  }

  // 阶段 4: 输出最终结果
  yield { type: 'result', subtype: 'success', duration_ms: Date.now() - startTime,
    total_cost_usd: getTotalCost(), usage: this.totalUsage, modelUsage: getModelUsage() }
}

4.4 Token 预算 – 六层防御体系

Token 预算是对话引擎的关键机制。src/utils/context.ts 定义了基础参数:

// src/utils/context.ts
export const MODEL_CONTEXT_WINDOW_DEFAULT = 200_000
export const CAPPED_DEFAULT_MAX_TOKENS = 8_000    // 默认输出上限
export const ESCALATED_MAX_TOKENS = 64_000        // 重试时升级上限

export function getContextWindowForModel(model: string, betas?: string[]): number {
  if (process.env.CLAUDE_CODE_MAX_CONTEXT_TOKENS) return parseInt(process.env.CLAUDE_CODE_MAX_CONTEXT_TOKENS, 10)
  if (has1mContext(model)) return 1_000_000       // [1m] 后缀 -> 100 万 token
  const cap = getModelCapability(model)
  if (cap?.max_input_tokens >= 100_000) return cap.max_input_tokens
  return MODEL_CONTEXT_WINDOW_DEFAULT              // 默认 200k
}

query.ts 中的 token 预算检查是六层防御:

// 层 1: 工具结果大小预算(防止单个工具输出撑爆上下文)
messagesForQuery = await applyToolResultBudget(messagesForQuery, contentReplacementState, ...)

// 层 2: Snip 压缩(历史消息裁剪)
if (feature('HISTORY_SNIP')) messagesForQuery = snipModule.snipCompactIfNeeded(messagesForQuery).messages

// 层 3: Microcompact(工具结果压缩)
messagesForQuery = (await deps.microcompact(messagesForQuery, toolUseContext, querySource)).messages

// 层 4: Context Collapse(上下文折叠)
if (feature('CONTEXT_COLLAPSE')) messagesForQuery = (await contextCollapse.applyCollapsesIfNeeded(...)).messages

// 层 5: Auto-compact(自动压缩 - 最终防线)
const { compactionResult } = await deps.autocompact(messagesForQuery, toolUseContext, { systemPrompt, userContext, systemContext }, ...)

// 层 6: 阻塞限制(auto-compact 关闭时的安全网)
if (!compactionResult) {
  const { isAtBlockingLimit } = calculateTokenWarningState(tokenCountWithEstimation(messagesForQuery), model)
  if (isAtBlockingLimit) return { reason: 'blocking_limit' }
}

这个 6 层防御的设计思路是渐进式压缩:从最小侵入的工具结果裁剪(层 1),到历史消息裁剪(层 2),到工具结果压缩(层 3),到上下文折叠(层 4),再到全量自动压缩(层 5),最后是硬性阻塞(层 6)。每一层都尽可能保留更多信息,只在必要时才升级到更激进的策略。

4.5 自动压缩与 MAX_TURNS 保护

自动压缩触发后的处理:

if (compactionResult) {
  logEvent('tengu_auto_compact_succeeded', {
    preCompactTokenCount, postCompactTokenCount, truePostCompactTokenCount,
    compactionInputTokens: compactionUsage?.input_tokens,
  })
  tracking = { compacted: true, turnId: deps.uuid(), turnCounter: 0, consecutiveFailures: 0 }
  const postCompactMessages = buildPostCompactMessages(compactionResult)
  for (const message of postCompactMessages) yield message
  messagesForQuery = postCompactMessages
}

无限循环保护:

// 轮次限制
if (maxTurns !== undefined && state.turnCount > maxTurns) return { reason: 'max_turns_reached' }
// max_output_tokens 恢复次数限制
const MAX_OUTPUT_TOKENS_RECOVERY_LIMIT = 3
if (state.maxOutputTokensRecoveryCount > MAX_OUTPUT_TOKENS_RECOVERY_LIMIT)
  return { reason: 'max_output_tokens_recovery_exhausted' }

五、工具执行流水线

5.1 传统 7 步流水线

async executeTool(toolUse: ToolUseBlock): Promise<ToolResult> {
  const tool = this.findTool(toolUse.name)                     // 1. 查找工具
  const validation = await tool.validateInput(toolUse.input)    // 2. 输入验证
  if (!validation.valid) return { error: validation.error }
  const permission = await this.checkPermissions(tool, input)   // 3. 权限检查
  if (permission === 'ask') { if (!await this.askUser(tool, input)) return { error: 'User rejected' } }
  const preHook = await runPreToolUseHooks(toolUse.name, input) // 4. PreToolUse Hook
  if (preHook.blockingError) return { error: preHook.blockingError }
  const result = await tool.call(toolUse.input, context)        // 5. 执行工具
  await runPostToolUseHooks(toolUse.name, input, result)        // 6. PostToolUse Hook
  return result                                                 // 7. 返回结果
}

5.2 流式工具预执行(StreamingToolExecutor)

优化创新:在 API 还在流式返回工具输入 JSON 时,就开始验证和准备:

// 当 content_block_start 到达时,创建执行器
streamingToolExecutor = new StreamingToolExecutor(tools, canUseTool, toolUseContext)
// 当 input_json_delta 到达时,增量更新输入
streamingToolExecutor?.updateInput(blockIndex, delta.partial_json)
// 当 content_block_stop 到达时,标记块完成并可开始预执行
streamingToolExecutor.onBlockComplete(blockIndex)
// 流式结束后,获取所有工具结果
const toolResults = await streamingToolExecutor.getResults()

5.3 工具结果消息构造

工具执行完成后,结果封装成标准 user 消息:

const toolResultMessage = createUserMessage({
  content: [{
    type: 'tool_result',
    tool_use_id: toolUse.id,
    content: result.output,      // 工具输出文本
    is_error: result.isError,    // 是否为错误
  }],
  toolUseResult: result.output,
  sourceToolAssistantUUID: assistantMessage.uuid,
})

5.4 七种权限模式

权限检查的核心在 useCanUseTool hook 中,它返回 'allow''deny''ask' 三种决策:

type CanUseToolFn = async (tool: Tool, input: Record<string, unknown>,
  toolUseContext: ToolUseContext, assistantMessage: AssistantMessage, toolUseID: string,
  forceDecision?: 'allow' | 'deny',
) => Promise<{ behavior: 'allow' | 'deny' | 'ask' }>

七种权限模式的行为差异:

模式 行为 典型场景
default 危险操作需要用户确认 日常交互
plan 只允许只读操作,写操作需审批 代码审查
acceptEdits 文件编辑自动允许,Bash 仍需确认 快速开发
bypassPermissions 跳过所有权限检查 CI/CD 自动化
dontAsk 自动拒绝需要确认的操作 只读分析
auto AI 分类器自动判断(yoloClassifier.ts) 智能模式
bubble 权限请求冒泡到父 Agent 多 Agent 协作

5.5 PreToolUse / PostToolUse Hook 系统

Hook 系统允许在工具执行前后注入自定义逻辑,如审计日志、安全检查、结果后处理等。PreToolUse Hook 可以阻止工具执行(返回 blockingError),PostToolUse Hook 则用于记录和通知:

// PreToolUse: 可阻止执行
const preHook = await runPreToolUseHooks(toolUse.name, toolUse.input, context)
if (preHook.blockingError) return { error: preHook.blockingError }

// PostToolUse: 审计与通知
await runPostToolUseHooks(toolUse.name, toolUse.input, result, context)

六、上下文管理深度解析

src/context.ts(190 行)负责在会话生命周期内收集和缓存上下文信息。三个核心函数都使用 lodash/memoize 缓存,整个会话只执行一次。

6.1 getSystemContext() – 系统上下文

export const getSystemContext = memoize(async (): Promise<{ [k: string]: string }> => {
  const gitStatus = isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || !shouldIncludeGitInstructions()
    ? null : await getGitStatus()
  const injection = feature('BREAK_CACHE_COMMAND') ? getSystemPromptInjection() : null
  return {
    ...(gitStatus && { gitStatus }),
    ...(injection ? { cacheBreaker: `[CACHE_BREAKER: ${injection}]` } : {}),
  }
})

6.2 getUserContext() – 用户上下文

export const getUserContext = memoize(async (): Promise<{ [k: string]: string }> => {
  const shouldDisableClaudeMd = isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_CLAUDE_MDS) ||
    (isBareMode() && getAdditionalDirectoriesForClaudeMd().length === 0)
  const claudeMd = shouldDisableClaudeMd ? null
    : getClaudeMds(filterInjectedMemoryFiles(await getMemoryFiles()))
  setCachedClaudeMdContent(claudeMd || null)  // 缓存供 auto-mode 分类器使用
  return { ...(claudeMd && { claudeMd }), currentDate: `Today's date is ${getLocalISODate()}.` }
})

6.3 getGitStatus() – 并行 Git 信息收集

性能优化的典范 – 5 个 git 命令并行执行:

const MAX_STATUS_CHARS = 2000

export const getGitStatus = memoize(async (): Promise<string | null> => {
  if (process.env.NODE_ENV === 'test') return null
  const isGit = await getIsGit()
  if (!isGit) return null
  try {
    // 5 个 git 命令并行!
    const [branch, mainBranch, status, log, userName] = await Promise.all([
      getBranch(), getDefaultBranch(),
      execFileNoThrow(gitExe(), ['--no-optional-locks', 'status', '--short']).then(r => r.stdout.trim()),
      execFileNoThrow(gitExe(), ['--no-optional-locks', 'log', '--oneline', '-n', '5']).then(r => r.stdout.trim()),
      execFileNoThrow(gitExe(), ['config', 'user.name']).then(r => r.stdout.trim()),
    ])
    // 状态超过 2000 字符时截断
    const truncatedStatus = status.length > MAX_STATUS_CHARS
      ? status.substring(0, MAX_STATUS_CHARS) + '\n... (truncated)'
      : status
    return [
      `This is the git status at the start of the conversation.`, `Current branch: ${branch}`,
      `Main branch: ${mainBranch}`, ...(userName ? [`Git user: ${userName}`] : []),
      `Status:\n${truncatedStatus || '(clean)'}`, `Recent commits:\n${log}`,
    ].join('\n\n')
  } catch (error) { logError(error); return null }
})

6.4 系统提示词注入机制

系统提示词通过 fetchSystemPromptParts() 组装,收集多个来源的上下文:

const { defaultSystemPrompt, userContext, systemContext } = await fetchSystemPromptParts({
  tools,                        // 可用工具列表 -> 生成工具描述
  mainLoopModel: model,         // 模型名称 -> 调整提示词策略
  additionalWorkingDirectories, // 额外工作目录 -> CLAUDE.md 搜索路径
  mcpClients,                   // MCP 服务器 -> 动态工具描述
  customSystemPrompt,           // 用户自定义系统提示词
})
const fullSystemPrompt = asSystemPrompt(appendSystemContext(systemPrompt, systemContext))
messagesForQuery = prependUserContext(messagesForQuery, userContext)  // 注入用户上下文到消息序列

七、成本追踪系统

src/cost-tracker.ts(324 行)是 Claude Code 的"记账本",精确追踪每次 API 调用的成本。

7.1 核心数据结构

type StoredCostState = {
  totalCostUSD: number                      // 总花费(美元)
  totalAPIDuration: number                  // API 调用总耗时
  totalAPIDurationWithoutRetries: number    // 不含重试的 API 耗时
  totalToolDuration: number                 // 工具执行总耗时
  totalLinesAdded: number                   // 代码新增行数
  totalLinesRemoved: number                 // 代码删除行数
  modelUsage: { [modelName: string]: ModelUsage } | undefined  // 按模型用量
}

7.2 accumulateUsage() – Token 用量累积

每次 API 调用返回后,4 种 token 类型被精确累加:

// src/services/api/claude.ts
export function accumulateUsage(total: Usage, delta: Usage): Usage {
  return {
    input_tokens: total.input_tokens + delta.input_tokens,
    output_tokens: total.output_tokens + delta.output_tokens,
    cache_creation_input_tokens: (total.cache_creation_input_tokens ?? 0) + (delta.cache_creation_input_tokens ?? 0),
    cache_read_input_tokens: (total.cache_read_input_tokens ?? 0) + (delta.cache_read_input_tokens ?? 0),
  }
}

四种 token 类型说明:

  • input_tokens – 输入 token(用户消息 + 系统提示词 + 工具描述)
  • output_tokens – 输出 token(AI 回复)
  • cache_creation_input_tokens – 缓存创建 token(首次写入 prompt cache)
  • cache_read_input_tokens – 缓存读取 token(命中 prompt cache,成本仅为创建的 10%)

这 4 种 token 的区分对于成本优化至关重要。Claude Code 的系统提示词和工具描述通常占几万个 token,通过 prompt cache 机制,后续请求只需支付 cache_read 的低成本(约为正常 input 的 10%),大幅降低了多轮对话的成本。

7.3 addToTotalSessionCost() – 会话成本累加

export function addToTotalSessionCost(cost: number, usage: Usage, model: string): number {
  const modelUsage = addToTotalModelUsage(cost, usage, model)
  addToTotalCostState(cost, modelUsage, model)
  // OpenTelemetry 指标上报
  getCostCounter()?.add(cost, { model })
  getTokenCounter()?.add(usage.input_tokens, { model, type: 'input' })
  getTokenCounter()?.add(usage.output_tokens, { model, type: 'output' })
  getTokenCounter()?.add(usage.cache_read_input_tokens ?? 0, { model, type: 'cacheRead' })
  getTokenCounter()?.add(usage.cache_creation_input_tokens ?? 0, { model, type: 'cacheCreation' })
  // 递归计算 advisor 子查询的成本
  let totalCost = cost
  for (const advisorUsage of getAdvisorUsage(usage)) {
    const advisorCost = calculateUSDCost(advisorUsage.model, advisorUsage)
    totalCost += addToTotalSessionCost(advisorCost, advisorUsage, advisorUsage.model)
  }
  return totalCost
}

7.4 会话成本持久化与 /cost 命令

成本数据在切换会话前保存到项目配置:

export function saveCurrentSessionCosts(fpsMetrics?: FpsMetrics): void {
  saveCurrentProjectConfig(current => ({
    ...current,
    lastCost: getTotalCostUSD(),
    lastAPIDuration: getTotalAPIDuration(),
    lastTotalInputTokens: getTotalInputTokens(),
    lastTotalOutputTokens: getTotalOutputTokens(),
    lastTotalCacheCreationInputTokens: getTotalCacheCreationInputTokens(),
    lastTotalCacheReadInputTokens: getTotalCacheReadInputTokens(),
    lastModelUsage: Object.fromEntries(
      Object.entries(getModelUsage()).map(([model, usage]) => [model, {
        inputTokens: usage.inputTokens, outputTokens: usage.outputTokens,
        cacheReadInputTokens: usage.cacheReadInputTokens, costUSD: usage.costUSD,
      }])
    ),
    lastSessionId: getSessionId(),
  }))
}

/cost 命令输出格式:

export function formatTotalCost(): string {
  return chalk.dim(
    `Total cost:            ${formatCost(getTotalCostUSD())}\n` +
    `Total duration (API):  ${formatDuration(getTotalAPIDuration())}\n` +
    `Total duration (wall): ${formatDuration(getTotalDuration())}\n` +
    `Total code changes:    ${getTotalLinesAdded()} lines added, ${getTotalLinesRemoved()} lines removed\n` +
    formatModelUsage(),  // 按模型短名称聚合:claude-sonnet-4: 1,234,567 input, 45,678 output ($0.45)
  )
}

八、错误处理与恢复

8.1 API 错误恢复路径

API 调用失败
  |
  +-> 超时/连接错误 -> withRetry 指数退避 -> 重试最多 10 次
  |
  +-> 429 Rate Limit -> 解析 Retry-After 头 -> 等待后重试
  |
  +-> 529 Overloaded -> 前台: 退避重试 (最多 3 次) | 后台: 立即放弃
  |                    -> 连续 3 次: 触发模型降级 (Opus -> Sonnet)
  |
  +-> Prompt Too Long -> reactiveCompact 压缩后重试
  |
  +-> Max Output Tokens -> 扩大 max_tokens 重试 (从 8k 升级到 64k,最多 3 次)
  |
  +-> 401 Auth Error -> 刷新 OAuth token -> 重建客户端 -> 重试
  |
  +-> 400/403 -> 不重试,显示具体错误信息

8.2 工具执行错误处理

工具执行失败时,错误信息封装成 tool_result 消息,让 AI 有机会自我修正:

if (result.isError) {
  const errorMessage = createUserMessage({
    content: [{ type: 'tool_result', tool_use_id: toolUse.id, content: `Error: ${result.error}`, is_error: true }],
  })
  messages.push(errorMessage)  // AI 在下一轮看到错误,可能尝试不同方法
}

8.3 中断恢复

用户按 Ctrl+C 时,yieldMissingToolResultBlocks() 为未完成的工具调用生成错误结果,防止 tool_use/tool_result 不匹配导致 API 400:

function* yieldMissingToolResultBlocks(assistantMessages: AssistantMessage[], errorMessage: string) {
  for (const assistantMessage of assistantMessages) {
    const toolUseBlocks = assistantMessage.message.content.filter(c => c.type === 'tool_use')
    for (const toolUse of toolUseBlocks) {
      yield createUserMessage({
        content: [{ type: 'tool_result', content: errorMessage, is_error: true, tool_use_id: toolUse.id }],
        sourceToolAssistantUUID: assistantMessage.uuid,
      })
    }
  }
}

九、设计亮点总结

设计决策 原因
AsyncGenerator 循环 流式输出,实时渲染,不阻塞
6 层 token 预算防御 snip -> microcompact -> collapse -> autocompact -> blocking -> recovery
流式工具预执行 在 API 流式返回时就开始验证工具,减少延迟
7 种权限模式 从只读到全自动,覆盖所有使用场景
指数退避 + 529 降级 优雅处理 API 限流,3 次 529 后自动降级模型
5 个 git 命令并行 Promise.all 并行收集 git 状态,避免串行阻塞
memoize 上下文缓存 系统/用户上下文整个会话只计算一次
4 种 token 分类追踪 input/output/cache_read/cache_creation,精确到分
tool_use/tool_result 配对 中断时自动生成错误结果,防止 API 400
500 行错误分类器 覆盖 15+ 种 API 错误场景,每种都有定制提示

下篇预告

第四篇:77 个斜杠命令的设计艺术

Claude Code 有 77 个斜杠命令,从 /commit/review,从 /mcp/vim。但你知道吗?这些命令其实分三种完全不同的类型:一种生成 AI 提示词,一种直接执行本地逻辑,还有一种渲染 React 组件。

更有趣的是,有些命令正在从内置迁移到插件系统。下一篇,我们来看看命令系统的设计模式和演变路径。


标签: Claude Code QueryEngine 流式API 消息循环 工具调用 Token预算 成本追踪 错误恢复 上下文管理 源码分析 TypeScript

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐