Claude Code 源码分析Claude Code 源码分析
首页
源码统计
系统架构
UML 图表
工具系统
CodeGraph
首页
源码统计
系统架构
UML 图表
工具系统
CodeGraph
  • 概览

    • Claude Code 源码分析
    • 源码统计
    • CodeGraph 图谱
  • 架构

    • 系统架构
    • UML 图表索引
    • 查询引擎
    • 核心流程
    • 消息系统
    • 状态管理
  • 功能模块

    • 工具系统
    • 斜杠命令
    • 服务层
    • MCP 协议
    • Skills 技能
    • 子代理系统
  • 分层深度

    • 入口层
    • UI / Ink 层
    • utils 基础设施
    • 桥接 / 远程
    • 上下文压缩
  • 原理与安全

    • 底层原理
    • 技术难点
    • 权限与安全
    • 内部机制
    • 遥测与分析
  • 深度专题

    • Hooks 系统
    • 插件系统
    • 记忆系统
    • API 通信层
    • Ink 终端 UI
    • 认证系统
    • 构建与发布
    • 术语表
  • 调用分析

    • 调用链分析
    • 核心文件索引
  • 模块详解

    • utils

      • 模块: utils
      • messages · 消息工厂与规范化
      • session-storage · JSONL 会话持久化
      • permissions · 工具权限决策
      • shell-hooks · 用户 Shell Hook 系统
    • components

      • 模块: components
      • REPL · 主屏编排
      • messages · 消息行渲染
      • PermissionRequest · 权限弹窗
      • PromptInput · 底部输入
    • services

      • 模块: services
      • api-claude · Anthropic API 流式与重试
      • mcp-client · MCP 连接与工具调用
      • compact · 上下文压缩与自动触发
      • analytics · GrowthBook、Datadog 与 1P 事件
    • tools

      • 模块: tools
      • tool-interface · Tool 契约与注册表
      • bash-tool · Shell 执行与权限
      • streaming-executor · 流式工具并发调度
      • agent-tool · 子 Agent 委派
    • commands

      • 模块: commands
      • command-registry · commands.ts 注册与分派
      • model-command · /model 模型选择
      • mcp-commands · /mcp 服务器管理
      • compact-memory-commands · /compact 与 /memory
    • ink

      • 模块: ink
      • Ink 渲染管线 · Screen 与终端输出
      • 终端事件 · resize、paste、stdin
      • Ink Hooks · 输入、搜索、终端状态
      • Ink 组件 · Box、Text、ScrollBox 原语
    • hooks

      • 模块: hooks
      • useCanUseTool · 权限 UI 接缝
      • 输入与快捷键 Hook
      • 合并态 Hook(MCP + 本地)
      • notifs 通知 Hook
    • bridge

      • 模块: bridge
      • repl-bridge · REPL 桥初始化与传输
      • bridge-messaging · 桥消息路由与入站处理
      • remote-bridge-core · env-less 核心与守护主循环
      • bridge-permissions-ui · 权限、API 与 TUI
    • cli

      • 模块: cli
      • Structured IO · NDJSON SDK 协议
      • CLI Transports · Session Ingress 传输层
      • CLI Handlers · 子命令懒加载实现
      • Update & Upload · 自更新与串行上传原语
    • screens

      • 模块: screens
      • REPL 屏 · Screen 类型与顶层路由
      • ResumeConversation · 会话恢复选择器
      • Doctor · 安装诊断全屏
    • entrypoints

      • 模块: entrypoints
      • cli-entrypoint · Bootstrap 与快路径
      • sdk-types · core / control / runtime 类型体系
      • mcp-entrypoint · MCP stdio 服务器
      • sandbox-types · 沙箱配置单一真相源
    • skills

      • 模块: skills
      • skills-loading · 磁盘加载与 bundled 注册表
      • bundled-skills · 内置 skill 与 initBundledSkills
      • mcp-skills · MCP prompt 转 skill
      • skill-tool-integration · SkillTool 与命令注册
    • types

      • 模块: types
      • message-types · Message 联合与 content blocks
      • tool-permission-types · Tool、Permission、Command 类型
      • api-sdk-types · API 与 Hooks 协议类型
      • misc-types · ids、plugin、generated 与其余类型
    • tasks

      • 模块: tasks
      • local-agent-task · 本地 Agent 与主会话后台化
      • remote-agent-task · 远程 CCR 与 In-Process Teammate
      • shell-workflow-tasks · Bash 后台、Workflow 与 stopTask
      • dream-monitor-tasks · Dream、Monitor MCP 与 pill 文案
    • keybindings

      • 模块: keybindings
      • keybinding-registry · 注册、Provider 与 useKeybinding
      • default-bindings · 默认键位表与平台差异
      • command-bindings · command:* 动态斜杠命令绑定
      • vim-bindings · Vim 模式与 keybindings 边界
    • memdir

      • 模块: memdir
      • memdir-core · 路径、加载与 MEMORY.md
      • memory-extraction · extractMemories 与 SessionMemory
      • memdir-commands · /memory、/remember 与命令集成
    • state

      • 模块: state
      • app-state-core · store、AppState 类型与 Provider
      • app-state-selectors · selectors 与 onChangeAppState
      • teammate-state · 队友视图与 swarm 状态
      • state-boundaries · bootstrap、sessionStorage、FileStateCache
    • query

      • 模块: query
      • query config 与 deps · 配置快照与依赖注入
      • query tokenBudget · +500k 自动续跑
      • query transitions · Continue / Terminal 状态机
      • query stopHooks · Stop 事件与 turn 结束编排
  • 模块详解(扩展)

    • messages · 消息工厂与规范化
    • session-storage · JSONL 会话持久化
    • permissions · 工具权限决策
    • shell-hooks · 用户 Shell Hook 系统
    • REPL · 主屏编排
    • messages · 消息行渲染
    • PermissionRequest · 权限弹窗
    • PromptInput · 底部输入
    • api-claude · Anthropic API 流式与重试
    • mcp-client · MCP 连接与工具调用
    • compact · 上下文压缩与自动触发
    • analytics · GrowthBook、Datadog 与 1P 事件
    • tool-interface · Tool 契约与注册表
    • bash-tool · Shell 执行与权限
    • streaming-executor · 流式工具并发调度
    • agent-tool · 子 Agent 委派
    • command-registry · commands.ts 注册与分派
    • model-command · /model 模型选择
    • mcp-commands · /mcp 服务器管理
    • compact-memory-commands · /compact 与 /memory
    • Ink 渲染管线 · Screen 与终端输出
    • 终端事件 · resize、paste、stdin
    • Ink Hooks · 输入、搜索、终端状态
    • Ink 组件 · Box、Text、ScrollBox 原语
    • useCanUseTool · 权限 UI 接缝
    • 输入与快捷键 Hook
    • 合并态 Hook(MCP + 本地)
    • notifs 通知 Hook
    • repl-bridge · REPL 桥初始化与传输
    • bridge-messaging · 桥消息路由与入站处理
    • remote-bridge-core · env-less 核心与守护主循环
    • bridge-permissions-ui · 权限、API 与 TUI
    • Structured IO · NDJSON SDK 协议
    • CLI Transports · Session Ingress 传输层
    • CLI Handlers · 子命令懒加载实现
    • Update & Upload · 自更新与串行上传原语
    • REPL 屏 · Screen 类型与顶层路由
    • ResumeConversation · 会话恢复选择器
    • Doctor · 安装诊断全屏
    • cli-entrypoint · Bootstrap 与快路径
    • sdk-types · core / control / runtime 类型体系
    • mcp-entrypoint · MCP stdio 服务器
    • sandbox-types · 沙箱配置单一真相源
    • skills-loading · 磁盘加载与 bundled 注册表
    • bundled-skills · 内置 skill 与 initBundledSkills
    • mcp-skills · MCP prompt 转 skill
    • skill-tool-integration · SkillTool 与命令注册
    • message-types · Message 联合与 content blocks
    • tool-permission-types · Tool、Permission、Command 类型
    • api-sdk-types · API 与 Hooks 协议类型
    • misc-types · ids、plugin、generated 与其余类型
    • local-agent-task · 本地 Agent 与主会话后台化
    • remote-agent-task · 远程 CCR 与 In-Process Teammate
    • shell-workflow-tasks · Bash 后台、Workflow 与 stopTask
    • dream-monitor-tasks · Dream、Monitor MCP 与 pill 文案
    • keybinding-registry · 注册、Provider 与 useKeybinding
    • default-bindings · 默认键位表与平台差异
    • command-bindings · command:* 动态斜杠命令绑定
    • vim-bindings · Vim 模式与 keybindings 边界
    • memdir-core · 路径、加载与 MEMORY.md
    • memory-extraction · extractMemories 与 SessionMemory
    • memdir-commands · /memory、/remember 与命令集成
    • app-state-core · store、AppState 类型与 Provider
    • app-state-selectors · selectors 与 onChangeAppState
    • teammate-state · 队友视图与 swarm 状态
    • state-boundaries · bootstrap、sessionStorage、FileStateCache
    • query config 与 deps · 配置快照与依赖注入
    • query tokenBudget · +500k 自动续跑
    • query transitions · Continue / Terminal 状态机
    • query stopHooks · Stop 事件与 turn 结束编排
  • 工具详解

    • tool-interface · Tool 契约与注册表
    • tool-permission-types · Tool、Permission、Command 类型
    • 工具: Bash
    • 工具: PowerShell
    • 工具: Agent
    • 工具: LSP
    • 工具: FileEdit
    • 工具: FileRead
    • 工具: Skill
    • 工具: WebFetch
    • 工具: MCP
    • 工具: SendMessage
    • 工具: FileWrite
    • 工具: Config
    • 工具: Grep
    • 工具: Brief
    • 工具: ExitPlanMode
    • 工具: ToolSearch
    • 工具: NotebookEdit
    • 工具: TaskOutput
    • 工具: WebSearch
    • 工具: ScheduleCron

本章总览

services/api/claude.ts(约 3400 行)是 Claude Code 与 Anthropic Messages API 之间的「总线」:把应用内 Message[] 转为 BetaMessageStreamParams,驱动 beta.messages.stream 流式解析,并在 529/429/连接错误时通过 withRetry 退避重试。本章要求你能从 REPL 里 spinner 转动的瞬间,反查到 queryModelWithStreaming 与 updateUsage 如何更新 token 统计与 cost-tracker。

学完本章你应该能

  • 说明 queryModelWithStreaming 与 queryModel 的 generator 分层
  • 解释 getCacheControl / addCacheBreakpoints 的 prompt cache 策略
  • 描述 withRetry 对 529 的前台/后台 source 区分
  • 理解 updateUsage 与 accumulateUsage 在流式事件中的语义
  • 能在非流式 fallback 路径定位 executeNonStreamingRequest

核心概念(先读懂这些)

claude.ts 不是 SDK 的简单封装

该文件在 SDK 之上叠加了:beta header 合并(effort、context management、tool search)、Bedrock extra body、fast mode、advisor 模型、quota header 解析、VCR 录制、GrowthBook gate 控制的 1h cache TTL、以及 ensureToolResultPairing 发 API 前的消息修复。许多逻辑(如 stripExcessMediaItems)只为满足 API_MAX_MEDIA_PER_REQUEST 限制,与 UI 无关。

流式 usage 是累积值而非增量

updateUsage 注释明确:Anthropic streaming API 的 message_delta 可能带 input_tokens=0,不应覆盖 message_start 已设置的值。误把 delta 当增量相加会导致 cost 统计偏低。多轮 assistant 之间用 accumulateUsage 求和,service_tier 取最近一次。

retry 与 querySource 绑定

withRetry.ts 维护 FOREGROUND_529_RETRY_SOURCES 白名单:只有用户阻塞等待的主线程、compact、agent 等 source 才在 529 时重试;summary/title/classifier 等后台调用立即失败,避免容量雪崩时网关放大 3–10 倍。新 source 默认不重试,需显式加入白名单。

建议学习步骤

  1. 阅读源码块 A:getCacheControl 与 1h TTL allowlist
  2. 阅读源码块 B:queryModelWithStreaming 入口
  3. 阅读源码块 C:executeNonStreamingRequest 与 withRetry
  4. 阅读源码块 D:updateUsage / accumulateUsage
  5. 阅读源码块 E:addCacheBreakpoints 与 tengu_api_cache_breakpoints
  6. 阅读源码块 F:withRetry 529 策略
  7. 在源码树打开 services/api/claude.ts 对照行号

常见误区

注意

不要把 utils/api.ts 的 toolToAPISchema 与 claude.ts 的 queryModel 混为一谈

注意

非流式 fallback 的 timeout 在 remote 环境默认 120s,本地 300s

注意

getFeatureValue_CACHED_MAY_BE_STALE 读的 cache 可能跨进程陈旧,1h TTL 用 bootstrap state latch 防 mid-session 翻转

在架构中的位置

Claude Code 主 query 循环的数据流可概括为:

query.ts 组装 messages + systemPrompt + tools
  → queryModelWithStreaming (claude.ts)
  → queryModel 内部:normalizeMessagesForAPI、addCacheBreakpoints
  → anthropic.beta.messages.stream
  → 逐 event yield StreamEvent / AssistantMessage
  → updateUsage 累加 token;addToTotalSessionCost 写 cost-tracker
  → 529/429 → withRetry 退避 → 可能 fallback executeNonStreamingRequest

services/api/ 目录还包含 client.ts(Anthropic client 单例)、withRetry.ts、usage.ts、errors.ts 等;本章聚焦 claude.ts 主路径。改 beta header 或 max_tokens 策略时,优先读本文件与 utils/betas.ts。

Prompt Cache:getCacheControl 与 1h TTL

Prompt caching 通过 cache_control: { type: 'ephemeral' } 标记写入点。getCacheControl 根据 scope(global vs 默认)与 querySource 决定是否附加 ttl: '1h'。

should1hCacheTTL 逻辑要点:

  1. Bedrock 用户可通过 ENABLE_PROMPT_CACHING_1H_BEDROCK 绕过 GrowthBook
  2. 1P 用户 eligibility 在 bootstrap state latch(ant 或 subscriber 非 overage),防止 mid-session overage 翻转 bust ~20K token cache
  3. allowlist 来自 GrowthBook tengu_prompt_cache_1h_config,支持 repl_main_thread* 前缀通配
  4. allowlist 同样 latch 到 session,避免 disk cache 更新导致同一 session 混用 5m/1h TTL

getPromptCachingEnabled 还处理 DISABLE_PROMPT_CACHING_OPUS 等 env 开关。读 cache 相关 bug 时,先查 assistant 上 requestId 链(getPreviousRequestIdFromMessages)与 promptCacheBreakDetection 模块。

源码引用: src/services/api/claude.ts · 第 333–356 行(共 3420 行)

 333| export function getPromptCachingEnabled(model: string): boolean {
 334|   // Global disable takes precedence
 335|   if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false
 336| 
 337|   // Check if we should disable for small/fast model
 338|   if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) {
 339|     const smallFastModel = getSmallFastModel()
 340|     if (model === smallFastModel) return false
 341|   }
 342| 
 343|   // Check if we should disable for default Sonnet
 344|   if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_SONNET)) {
 345|     const defaultSonnet = getDefaultSonnetModel()
 346|     if (model === defaultSonnet) return false
 347|   }
 348| 
 349|   // Check if we should disable for default Opus
 350|   if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_OPUS)) {
 351|     const defaultOpus = getDefaultOpusModel()
 352|     if (model === defaultOpus) return false
 353|   }
 354| 
 355|   return true
 356| }

源码引用: src/services/api/claude.ts · 第 358–434 行(共 3420 行)

 358| export function getCacheControl({
 359|   scope,
 360|   querySource,
 361| }: {
 362|   scope?: CacheScope
 363|   querySource?: QuerySource
 364| } = {}): {
 365|   type: 'ephemeral'
 366|   ttl?: '1h'
 367|   scope?: CacheScope
 368| } {
 369|   return {
 370|     type: 'ephemeral',
 371|     ...(should1hCacheTTL(querySource) && { ttl: '1h' }),
 372|     ...(scope === 'global' && { scope }),
 373|   }
 374| }
 375| 
 376| /**
 377|  * Determines if 1h TTL should be used for prompt caching.
 378|  *
 379|  * Only applied when:
 380|  * 1. User is eligible (ant or subscriber within rate limits)
 381|  * 2. The query source matches a pattern in the GrowthBook allowlist
 382|  *
 383|  * GrowthBook config shape: { allowlist: string[] }
 384|  * Patterns support trailing '*' for prefix matching.
 385|  * Examples:
 386|  * - { allowlist: ["repl_main_thread*", "sdk"] } — main thread + SDK only
 387|  * - { allowlist: ["repl_main_thread*", "sdk", "agent:*"] } — also subagents
 388|  * - { allowlist: ["*"] } — all sources
 389|  *
 390|  * The allowlist is cached in STATE for session stability — prevents mixed
 391|  * TTLs when GrowthBook's disk cache updates mid-request.
 392|  */
 393| function should1hCacheTTL(querySource?: QuerySource): boolean {
 394|   // 3P Bedrock users get 1h TTL when opted in via env var — they manage their own billing
 395|   // No GrowthBook gating needed since 3P users don't have GrowthBook configured
 396|   if (
 397|     getAPIProvider() === 'bedrock' &&
 398|     isEnvTruthy(process.env.ENABLE_PROMPT_CACHING_1H_BEDROCK)
 399|   ) {
 400|     return true
 401|   }
 402| 
 403|   // Latch eligibility in bootstrap state for session stability — prevents
 404|   // mid-session overage flips from changing the cache_control TTL, which
 405|   // would bust the server-side prompt cache (~20K tokens per flip).
 406|   let userEligible = getPromptCache1hEligible()
 407|   if (userEligible === null) {
 408|     userEligible =
 409|       process.env.USER_TYPE === 'ant' ||
 410|       (isClaudeAISubscriber() && !currentLimits.isUsingOverage)
 411|     setPromptCache1hEligible(userEligible)
 412|   }
 413|   if (!userEligible) return false
 414| 
 415|   // Cache allowlist in bootstrap state for session stability — prevents mixed
 416|   // TTLs when GrowthBook's disk cache updates mid-request
 417|   let allowlist = getPromptCache1hAllowlist()
 418|   if (allowlist === null) {
 419|     const config = getFeatureValue_CACHED_MAY_BE_STALE<{
 420|       allowlist?: string[]
 421|     }>('tengu_prompt_cache_1h_config', {})
 422|     allowlist = config.allowlist ?? []
 423|     setPromptCache1hAllowlist(allowlist)
 424|   }
 425| 
 426|   return (
 427|     querySource !== undefined &&
 428|     allowlist.some(pattern =>
 429|       pattern.endsWith('*')
 430|         ? querySource.startsWith(pattern.slice(0, -1))
 431|         : querySource === pattern,
 432|     )
 433|   )
 434| }

queryModelWithStreaming 入口

queryModelWithStreaming 是对外稳定 API:接收 messages、systemPrompt、thinkingConfig、tools、AbortSignal 与 Options,返回 async generator,yield 类型为 StreamEvent | AssistantMessage | SystemAPIErrorMessage。

实现上包一层 withStreamingVCR:测试/录制场景可重放历史流。内部 yield* queryModel(...) 承担实际 HTTP 与 event 解析。

阅读要点:

  • shouldDeferLspTool 在 LSP 未初始化完成时给 tool schema 加 defer_loading,避免模型过早调用未就绪工具
  • 调用方(query.ts)负责在 generator 结束后读取 final usage 并写 session
  • SystemAPIErrorMessage 来自 withRetry 的 system yield,UI 可展示「重试中」而不中断 transcript

建议在调试「流式无输出」时,在 queryModel 内 message_start 分支打断点,确认 betas 与 model 字符串是否被 normalizeModelStringForAPI 改写。

源码引用: src/services/api/claude.ts · 第 752–793 行(共 3420 行)

 752| export async function* queryModelWithStreaming({
 753|   messages,
 754|   systemPrompt,
 755|   thinkingConfig,
 756|   tools,
 757|   signal,
 758|   options,
 759| }: {
 760|   messages: Message[]
 761|   systemPrompt: SystemPrompt
 762|   thinkingConfig: ThinkingConfig
 763|   tools: Tools
 764|   signal: AbortSignal
 765|   options: Options
 766| }): AsyncGenerator<
 767|   StreamEvent | AssistantMessage | SystemAPIErrorMessage,
 768|   void
 769| > {
 770|   return yield* withStreamingVCR(messages, async function* () {
 771|     yield* queryModel(
 772|       messages,
 773|       systemPrompt,
 774|       thinkingConfig,
 775|       tools,
 776|       signal,
 777|       options,
 778|     )
 779|   })
 780| }
 781| 
 782| /**
 783|  * Determines if an LSP tool should be deferred (tool appears with defer_loading: true)
 784|  * because LSP initialization is not yet complete.
 785|  */
 786| function shouldDeferLspTool(tool: Tool): boolean {
 787|   if (!('isLsp' in tool) || !tool.isLsp) {
 788|     return false
 789|   }
 790|   const status = getInitializationStatus()
 791|   // Defer when pending or not started
 792|   return status.status === 'pending' || status.status === 'not-started'
 793| }

非流式 Fallback:executeNonStreamingRequest

当 streaming 失败或环境强制 non-streaming 时,executeNonStreamingRequest 封装公共模式:

  1. 调用 withRetry 包裹 anthropic.beta.messages.create
  2. adjustParamsForNonStreaming 限制 max_tokens 不超过 MAX_NON_STREAMING_TOKENS(64000)
  3. timeout:API_TIMEOUT_MS 优先;remote 默认 120s,否则 300s
  4. 每次 attempt yield system 消息供 UI;最终 return BetaMessage

错误路径区分 APIUserAbortError(立即 rethrow,不记 tengu_nonstreaming_fallback_error)与其他错误(打 diag + logEvent('tengu_nonstreaming_fallback_error'),含 originatingRequestId 便于与失败 stream 关联)。

该路径是「用户感知延迟」的兜底:remote container idle-kill ~5min 时,120s timeout 能 surfaced 为 APIConnectionTimeoutError 而非 silent hang。

源码引用: src/services/api/claude.ts · 第 807–917 行(共 3420 行)

 807| function getNonstreamingFallbackTimeoutMs(): number {
 808|   const override = parseInt(process.env.API_TIMEOUT_MS || '', 10)
 809|   if (override) return override
 810|   return isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) ? 120_000 : 300_000
 811| }
 812| 
 813| /**
 814|  * Helper generator for non-streaming API requests.
 815|  * Encapsulates the common pattern of creating a withRetry generator,
 816|  * iterating to yield system messages, and returning the final BetaMessage.
 817|  */
 818| export async function* executeNonStreamingRequest(
 819|   clientOptions: {
 820|     model: string
 821|     fetchOverride?: Options['fetchOverride']
 822|     source: string
 823|   },
 824|   retryOptions: {
 825|     model: string
 826|     fallbackModel?: string
 827|     thinkingConfig: ThinkingConfig
 828|     fastMode?: boolean
 829|     signal: AbortSignal
 830|     initialConsecutive529Errors?: number
 831|     querySource?: QuerySource
 832|   },
 833|   paramsFromContext: (context: RetryContext) => BetaMessageStreamParams,
 834|   onAttempt: (attempt: number, start: number, maxOutputTokens: number) => void,
 835|   captureRequest: (params: BetaMessageStreamParams) => void,
 836|   /**
 837|    * Request ID of the failed streaming attempt this fallback is recovering
 838|    * from. Emitted in tengu_nonstreaming_fallback_error for funnel correlation.
 839|    */
 840|   originatingRequestId?: string | null,
 841| ): AsyncGenerator<SystemAPIErrorMessage, BetaMessage> {
 842|   const fallbackTimeoutMs = getNonstreamingFallbackTimeoutMs()
 843|   const generator = withRetry(
 844|     () =>
 845|       getAnthropicClient({
 846|         maxRetries: 0,
 847|         model: clientOptions.model,
 848|         fetchOverride: clientOptions.fetchOverride,
 849|         source: clientOptions.source,
 850|       }),
 851|     async (anthropic, attempt, context) => {
 852|       const start = Date.now()
 853|       const retryParams = paramsFromContext(context)
 854|       captureRequest(retryParams)
 855|       onAttempt(attempt, start, retryParams.max_tokens)
 856| 
 857|       const adjustedParams = adjustParamsForNonStreaming(
 858|         retryParams,
 859|         MAX_NON_STREAMING_TOKENS,
 860|       )
 861| 
 862|       try {
 863|         // biome-ignore lint/plugin: non-streaming API call
 864|         return await anthropic.beta.messages.create(
 865|           {
 866|             ...adjustedParams,
 867|             model: normalizeModelStringForAPI(adjustedParams.model),
 868|           },
 869|           {
 870|             signal: retryOptions.signal,
 871|             timeout: fallbackTimeoutMs,
 872|           },
 873|         )
 874|       } catch (err) {
 875|         // User aborts are not errors — re-throw immediately without logging
 876|         if (err instanceof APIUserAbortError) throw err
 877| 
 878|         // Instrumentation: record when the non-streaming request errors (including
 879|         // timeouts). Lets us distinguish "fallback hung past container kill"
 880|         // (no event) from "fallback hit the bounded timeout" (this event).
 881|         logForDiagnosticsNoPII('error', 'cli_nonstreaming_fallback_error')
 882|         logEvent('tengu_nonstreaming_fallback_error', {
 883|           model:
 884|             clientOptions.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 885|           error:
 886|             err instanceof Error
 887|               ? (err.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
 888|               : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
 889|           attempt,
 890|           timeout_ms: fallbackTimeoutMs,
 891|           request_id: (originatingRequestId ??
 892|             'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 893|         })
 894|         throw err
 895|       }
 896|     },
 897|     {
 898|       model: retryOptions.model,
 899|       fallbackModel: retryOptions.fallbackModel,
 900|       thinkingConfig: retryOptions.thinkingConfig,
 901|       ...(isFastModeEnabled() && { fastMode: retryOptions.fastMode }),
 902|       signal: retryOptions.signal,
 903|       initialConsecutive529Errors: retryOptions.initialConsecutive529Errors,
 904|       querySource: retryOptions.querySource,
 905|     },
 906|   )
 907| 
 908|   let e
 909|   do {
 910|     e = await generator.next()
 911|     if (!e.done && e.value.type === 'system') {
 912|       yield e.value
 913|     }
 914|   } while (!e.done)
 915| 
 916|   return e.value as BetaMessage
 917| }

Usage 统计:updateUsage 与 accumulateUsage

流式 API 的 usage 字段语义容易踩坑:

updateUsage 处理单条 stream 内的 BetaMessageDeltaUsage:

  • input / cache_creation / cache_read 仅在 non-null 且 > 0 时覆盖,防止 message_delta 的 0 抹掉 message_start 值
  • output_tokens 用 ?? 保留旧值
  • cache_deleted_input_tokens(cache editing)在 CACHED_MICROCOMPACT feature 下同样 >0 guard
  • server_tool_use(web_search / web_fetch)从 delta 合并

accumulateUsage 用于多 assistant turn 求和;service_tier、inference_geo、iterations、speed 取 最近一条 message 的值。

cost-tracker 与 REPL token 指示器依赖这些函数输出。若发现「输出 token 翻倍」,先查是否误对 delta 做 accumulate 而非 update。

源码引用: src/services/api/claude.ts · 第 2914–3038 行(共 3420 行)

2914| /**
2915|  * Updates usage statistics with new values from streaming API events.
2916|  * Note: Anthropic's streaming API provides cumulative usage totals, not incremental deltas.
2917|  * Each event contains the complete usage up to that point in the stream.
2918|  *
2919|  * Input-related tokens (input_tokens, cache_creation_input_tokens, cache_read_input_tokens)
2920|  * are typically set in message_start and remain constant. message_delta events may send
2921|  * explicit 0 values for these fields, which should not overwrite the values from message_start.
2922|  * We only update these fields if they have a non-null, non-zero value.
2923|  */
2924| export function updateUsage(
2925|   usage: Readonly<NonNullableUsage>,
2926|   partUsage: BetaMessageDeltaUsage | undefined,
2927| ): NonNullableUsage {
2928|   if (!partUsage) {
2929|     return { ...usage }
2930|   }
2931|   return {
2932|     input_tokens:
2933|       partUsage.input_tokens !== null && partUsage.input_tokens > 0
2934|         ? partUsage.input_tokens
2935|         : usage.input_tokens,
2936|     cache_creation_input_tokens:
2937|       partUsage.cache_creation_input_tokens !== null &&
2938|       partUsage.cache_creation_input_tokens > 0
2939|         ? partUsage.cache_creation_input_tokens
2940|         : usage.cache_creation_input_tokens,
2941|     cache_read_input_tokens:
2942|       partUsage.cache_read_input_tokens !== null &&
2943|       partUsage.cache_read_input_tokens > 0
2944|         ? partUsage.cache_read_input_tokens
2945|         : usage.cache_read_input_tokens,
2946|     output_tokens: partUsage.output_tokens ?? usage.output_tokens,
2947|     server_tool_use: {
2948|       web_search_requests:
2949|         partUsage.server_tool_use?.web_search_requests ??
2950|         usage.server_tool_use.web_search_requests,
2951|       web_fetch_requests:
2952|         partUsage.server_tool_use?.web_fetch_requests ??
2953|         usage.server_tool_use.web_fetch_requests,
2954|     },
2955|     service_tier: usage.service_tier,
2956|     cache_creation: {
2957|       // SDK type BetaMessageDeltaUsage is missing cache_creation, but it's real!
2958|       ephemeral_1h_input_tokens:
2959|         (partUsage as BetaUsage).cache_creation?.ephemeral_1h_input_tokens ??
2960|         usage.cache_creation.ephemeral_1h_input_tokens,
2961|       ephemeral_5m_input_tokens:
2962|         (partUsage as BetaUsage).cache_creation?.ephemeral_5m_input_tokens ??
2963|         usage.cache_creation.ephemeral_5m_input_tokens,
2964|     },
2965|     // cache_deleted_input_tokens: returned by the API when cache editing
2966|     // deletes KV cache content, but not in SDK types. Kept off NonNullableUsage
2967|     // so the string is eliminated from external builds by dead code elimination.
2968|     // Uses the same > 0 guard as other token fields to prevent message_delta
2969|     // from overwriting the real value with 0.
2970|     ...(feature('CACHED_MICROCOMPACT')
2971|       ? {
2972|           cache_deleted_input_tokens:
2973|             (partUsage as unknown as { cache_deleted_input_tokens?: number })
2974|               .cache_deleted_input_tokens != null &&
2975|             (partUsage as unknown as { cache_deleted_input_tokens: number })
2976|               .cache_deleted_input_tokens > 0
2977|               ? (partUsage as unknown as { cache_deleted_input_tokens: number })
2978|                   .cache_deleted_input_tokens
2979|               : ((usage as unknown as { cache_deleted_input_tokens?: number })
2980|                   .cache_deleted_input_tokens ?? 0),
2981|         }
2982|       : {}),
2983|     inference_geo: usage.inference_geo,
2984|     iterations: partUsage.iterations ?? usage.iterations,
2985|     speed: (partUsage as BetaUsage).speed ?? usage.speed,
2986|   }
2987| }
2988| 
2989| /**
2990|  * Accumulates usage from one message into a total usage object.
2991|  * Used to track cumulative usage across multiple assistant turns.
2992|  */
2993| export function accumulateUsage(
2994|   totalUsage: Readonly<NonNullableUsage>,
2995|   messageUsage: Readonly<NonNullableUsage>,
2996| ): NonNullableUsage {
2997|   return {
2998|     input_tokens: totalUsage.input_tokens + messageUsage.input_tokens,
2999|     cache_creation_input_tokens:
3000|       totalUsage.cache_creation_input_tokens +
3001|       messageUsage.cache_creation_input_tokens,
3002|     cache_read_input_tokens:
3003|       totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens,
3004|     output_tokens: totalUsage.output_tokens + messageUsage.output_tokens,
3005|     server_tool_use: {
3006|       web_search_requests:
3007|         totalUsage.server_tool_use.web_search_requests +
3008|         messageUsage.server_tool_use.web_search_requests,
3009|       web_fetch_requests:
3010|         totalUsage.server_tool_use.web_fetch_requests +
3011|         messageUsage.server_tool_use.web_fetch_requests,
3012|     },
3013|     service_tier: messageUsage.service_tier, // Use the most recent service tier
3014|     cache_creation: {
3015|       ephemeral_1h_input_tokens:
3016|         totalUsage.cache_creation.ephemeral_1h_input_tokens +
3017|         messageUsage.cache_creation.ephemeral_1h_input_tokens,
3018|       ephemeral_5m_input_tokens:
3019|         totalUsage.cache_creation.ephemeral_5m_input_tokens +
3020|         messageUsage.cache_creation.ephemeral_5m_input_tokens,
3021|     },
3022|     // See comment in updateUsage — field is not on NonNullableUsage to keep
3023|     // the string out of external builds.
3024|     ...(feature('CACHED_MICROCOMPACT')
3025|       ? {
3026|           cache_deleted_input_tokens:
3027|             ((totalUsage as unknown as { cache_deleted_input_tokens?: number })
3028|               .cache_deleted_input_tokens ?? 0) +
3029|             ((
3030|               messageUsage as unknown as { cache_deleted_input_tokens?: number }
3031|             ).cache_deleted_input_tokens ?? 0),
3032|         }
3033|       : {}),
3034|     inference_geo: messageUsage.inference_geo, // Use the most recent
3035|     iterations: messageUsage.iterations, // Use the most recent
3036|     speed: messageUsage.speed, // Use the most recent
3037|   }
3038| }

addCacheBreakpoints 与 cache_edits

addCacheBreakpoints 把 Message[] 转为 API MessageParam[],并在策略允许时插入 cache_control 标记。日志事件 tengu_api_cache_breakpoints 记录 message 数、cachingEnabled、skipCacheWrite。

与 compact/apiMicrocompact 协作时,可传入 useCachedMC、newCacheEdits、pinnedEdits:实现 API 侧 context management(删除 KV cache 片段)而非整段重发。注释强调「每条请求恰好一个 message-level cache_control 标记」,与 Mycro turn-to-turn eviction 对齐。

读此函数时应对照 getAPIContextManagement(compact 子模块)与 GrowthBook tengu_compact_cache_prefix gate。

源码引用: src/services/api/claude.ts · 第 3062–3076 行(共 3420 行)

3062| // Exported for testing cache_reference placement constraints
3063| export function addCacheBreakpoints(
3064|   messages: (UserMessage | AssistantMessage)[],
3065|   enablePromptCaching: boolean,
3066|   querySource?: QuerySource,
3067|   useCachedMC = false,
3068|   newCacheEdits?: CachedMCEditsBlock | null,
3069|   pinnedEdits?: CachedMCPinnedEdits[],
3070|   skipCacheWrite = false,
3071| ): MessageParam[] {
3072|   logEvent('tengu_api_cache_breakpoints', {
3073|     totalMessageCount: messages.length,
3074|     cachingEnabled: enablePromptCaching,
3075|     skipCacheWrite,
3076|   })

withRetry:529、429 与模型 fallback

services/api/withRetry.ts 导出 withRetry async generator,claude.ts 的 stream 与 non-stream 路径均依赖它。

核心常量:

  • DEFAULT_MAX_RETRIES = 10,BASE_DELAY_MS = 500
  • MAX_529_RETRIES = 3 与 FOREGROUND_529_RETRY_SOURCES 集合
  • CannotRetryError / FallbackTriggeredError 触发模型降级

shouldRetry529:querySource === undefined 时保守地 允许 重试(未标记的后台路径);显式列入白名单的 compact、repl_main_thread、agent:* 等才在容量错误时退避。

isPersistentRetryEnabled(ant + CLAUDE_CODE_UNATTENDED_RETRY):429/529 无限重试 + 30s heartbeat SystemAPIErrorMessage,防止 unattended 环境 idle kill。

Fast mode overage、OAuth 401、AWS credential 错误等在 withRetry 内有专门分支,并 logEvent 到 analytics。

源码引用: src/services/api/withRetry.ts · 第 52–89 行(共 823 行)

  52| const DEFAULT_MAX_RETRIES = 10
  53| const FLOOR_OUTPUT_TOKENS = 3000
  54| const MAX_529_RETRIES = 3
  55| export const BASE_DELAY_MS = 500
  56| 
  57| // Foreground query sources where the user IS blocking on the result — these
  58| // retry on 529. Everything else (summaries, titles, suggestions, classifiers)
  59| // bails immediately: during a capacity cascade each retry is 3-10× gateway
  60| // amplification, and the user never sees those fail anyway. New sources
  61| // default to no-retry — add here only if the user is waiting on the result.
  62| const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([
  63|   'repl_main_thread',
  64|   'repl_main_thread:outputStyle:custom',
  65|   'repl_main_thread:outputStyle:Explanatory',
  66|   'repl_main_thread:outputStyle:Learning',
  67|   'sdk',
  68|   'agent:custom',
  69|   'agent:default',
  70|   'agent:builtin',
  71|   'compact',
  72|   'hook_agent',
  73|   'hook_prompt',
  74|   'verification_agent',
  75|   'side_question',
  76|   // Security classifiers — must complete for auto-mode correctness.
  77|   // yoloClassifier.ts uses 'auto_mode' (not 'yolo_classifier' — that's
  78|   // type-only). bash_classifier is ant-only; feature-gate so the string
  79|   // tree-shakes out of external builds (excluded-strings.txt).
  80|   'auto_mode',
  81|   ...(feature('BASH_CLASSIFIER') ? (['bash_classifier'] as const) : []),
  82| ])
  83| 
  84| function shouldRetry529(querySource: QuerySource | undefined): boolean {
  85|   // undefined → retry (conservative for untagged call paths)
  86|   return (
  87|     querySource === undefined || FOREGROUND_529_RETRY_SOURCES.has(querySource)
  88|   )
  89| }

源码引用: src/services/api/withRetry.ts · 第 170–220 行(共 823 行)

 170| export async function* withRetry<T>(
 171|   getClient: () => Promise<Anthropic>,
 172|   operation: (
 173|     client: Anthropic,
 174|     attempt: number,
 175|     context: RetryContext,
 176|   ) => Promise<T>,
 177|   options: RetryOptions,
 178| ): AsyncGenerator<SystemAPIErrorMessage, T> {
 179|   const maxRetries = getMaxRetries(options)
 180|   const retryContext: RetryContext = {
 181|     model: options.model,
 182|     thinkingConfig: options.thinkingConfig,
 183|     ...(isFastModeEnabled() && { fastMode: options.fastMode }),
 184|   }
 185|   let client: Anthropic | null = null
 186|   let consecutive529Errors = options.initialConsecutive529Errors ?? 0
 187|   let lastError: unknown
 188|   let persistentAttempt = 0
 189|   for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
 190|     if (options.signal?.aborted) {
 191|       throw new APIUserAbortError()
 192|     }
 193| 
 194|     // Capture whether fast mode is active before this attempt
 195|     // (fallback may change the state mid-loop)
 196|     const wasFastModeActive = isFastModeEnabled()
 197|       ? retryContext.fastMode && !isFastModeCooldown()
 198|       : false
 199| 
 200|     try {
 201|       // Check for mock rate limits (used by /mock-limits command for Ant employees)
 202|       if (process.env.USER_TYPE === 'ant') {
 203|         const mockError = checkMockRateLimitError(
 204|           retryContext.model,
 205|           wasFastModeActive,
 206|         )
 207|         if (mockError) {
 208|           throw mockError
 209|         }
 210|       }
 211| 
 212|       // Get a fresh client instance on first attempt or after authentication errors
 213|       // - 401 for first-party API authentication failures
 214|       // - 403 "OAuth token has been revoked" (another process refreshed the token)
 215|       // - Bedrock-specific auth errors (403 or CredentialsProviderError)
 216|       // - Vertex-specific auth errors (credential refresh failures, 401)
 217|       // - ECONNRESET/EPIPE: stale keep-alive socket; disable pooling and reconnect
 218|       const isStaleConnection = isStaleConnectionError(lastError)
 219|       if (
 220|         isStaleConnection &&

媒体限制与 requestId 链

stripExcessMediaItems 保证单次请求 image+document 不超过 API_MAX_MEDIA_PER_REQUEST:从最早媒体开始剥离,保留最近附件,避免模型看不到用户刚粘贴的截图。

getPreviousRequestIdFromMessages 从 messages 数组尾部找最近 assistant 的 requestId,用于 analytics 链接 consecutive requests(cache hit rate、incremental token)。注释强调比全局 state 更可靠:subagent/teammate 各自 chain,rollback 自然更新。

这两函数体现 claude.ts「发 API 前最后一道卫生检查」的角色,与 utils/messages 的 normalize 互补。

源码引用: src/services/api/claude.ts · 第 919–973 行(共 3420 行)

 919| /**
 920|  * Extracts the request ID from the most recent assistant message in the
 921|  * conversation. Used to link consecutive API requests in analytics so we can
 922|  * join them for cache-hit-rate analysis and incremental token tracking.
 923|  *
 924|  * Deriving this from the message array (rather than global state) ensures each
 925|  * query chain (main thread, subagent, teammate) tracks its own request chain
 926|  * independently, and rollback/undo naturally updates the value.
 927|  */
 928| function getPreviousRequestIdFromMessages(
 929|   messages: Message[],
 930| ): string | undefined {
 931|   for (let i = messages.length - 1; i >= 0; i--) {
 932|     const msg = messages[i]!
 933|     if (msg.type === 'assistant' && msg.requestId) {
 934|       return msg.requestId
 935|     }
 936|   }
 937|   return undefined
 938| }
 939| 
 940| function isMedia(
 941|   block: BetaContentBlockParam,
 942| ): block is BetaImageBlockParam | BetaRequestDocumentBlock {
 943|   return block.type === 'image' || block.type === 'document'
 944| }
 945| 
 946| function isToolResult(
 947|   block: BetaContentBlockParam,
 948| ): block is BetaToolResultBlockParam {
 949|   return block.type === 'tool_result'
 950| }
 951| 
 952| /**
 953|  * Ensures messages contain at most `limit` media items (images + documents).
 954|  * Strips oldest media first to preserve the most recent.
 955|  */
 956| export function stripExcessMediaItems(
 957|   messages: (UserMessage | AssistantMessage)[],
 958|   limit: number,
 959| ): (UserMessage | AssistantMessage)[] {
 960|   let toRemove = 0
 961|   for (const msg of messages) {
 962|     if (!Array.isArray(msg.message.content)) continue
 963|     for (const block of msg.message.content) {
 964|       if (isMedia(block)) toRemove++
 965|       if (isToolResult(block) && Array.isArray(block.content)) {
 966|         for (const nested of block.content) {
 967|           if (isMedia(nested)) toRemove++
 968|         }
 969|       }
 970|     }
 971|   }
 972|   toRemove -= limit
 973|   if (toRemove <= 0) return messages

源码目录与关联文件

强关联:services/api/withRetry.ts、services/api/client.ts、services/compact/apiMicrocompact.ts、utils/messages.ts(normalizeMessagesForAPI)、query.ts(调用 queryModelWithStreaming)。点击 claude.ts 跳回本章源码块。

动手练习

  1. 在 REPL 触发一次 529(或 mock rate limit),观察 system 重试消息与 tengu_query_error 事件
  2. 对比同一 session 两轮请求的 cache_read_input_tokens,确认 1h TTL allowlist 是否 latch
  3. 阅读 cleanupStream,理解 abort 后为何必须 abort stream.controller
  4. 用 getMaxOutputTokensForModel 对照 compact 章的 MAX_OUTPUT_TOKENS_FOR_SUMMARY 预留逻辑

本章小结与延伸

claude.ts = 模型请求的编排中心。下一章建议 mcp-client,理解工具列表如何进入同一 params.tools 数组。 继续学习:

  • mcp-client
  • compact
Prev
模块: services
Next
mcp-client · MCP 连接与工具调用