本章总览
services/api/claude.ts(约 3400 行)是 Claude Code 与 Anthropic Messages API 之间的「总线」:把应用内 Message[] 转为 BetaMessageStreamParams,驱动 beta.messages.stream 流式解析,并在 529/429/连接错误时通过 withRetry 退避重试。本章要求你能从 REPL 里 spinner 转动的瞬间,反查到 queryModelWithStreaming 与 updateUsage 如何更新 token 统计与 cost-tracker。
学完本章你应该能
- 说明 queryModelWithStreaming 与 queryModel 的 generator 分层
- 解释 getCacheControl / addCacheBreakpoints 的 prompt cache 策略
- 描述 withRetry 对 529 的前台/后台 source 区分
- 理解 updateUsage 与 accumulateUsage 在流式事件中的语义
- 能在非流式 fallback 路径定位 executeNonStreamingRequest
核心概念(先读懂这些)
claude.ts 不是 SDK 的简单封装
该文件在 SDK 之上叠加了:beta header 合并(effort、context management、tool search)、Bedrock extra body、fast mode、advisor 模型、quota header 解析、VCR 录制、GrowthBook gate 控制的 1h cache TTL、以及 ensureToolResultPairing 发 API 前的消息修复。许多逻辑(如 stripExcessMediaItems)只为满足 API_MAX_MEDIA_PER_REQUEST 限制,与 UI 无关。
流式 usage 是累积值而非增量
updateUsage 注释明确:Anthropic streaming API 的 message_delta 可能带 input_tokens=0,不应覆盖 message_start 已设置的值。误把 delta 当增量相加会导致 cost 统计偏低。多轮 assistant 之间用 accumulateUsage 求和,service_tier 取最近一次。
retry 与 querySource 绑定
withRetry.ts 维护 FOREGROUND_529_RETRY_SOURCES 白名单:只有用户阻塞等待的主线程、compact、agent 等 source 才在 529 时重试;summary/title/classifier 等后台调用立即失败,避免容量雪崩时网关放大 3–10 倍。新 source 默认不重试,需显式加入白名单。
建议学习步骤
- 阅读源码块 A:getCacheControl 与 1h TTL allowlist
- 阅读源码块 B:queryModelWithStreaming 入口
- 阅读源码块 C:executeNonStreamingRequest 与 withRetry
- 阅读源码块 D:updateUsage / accumulateUsage
- 阅读源码块 E:addCacheBreakpoints 与 tengu_api_cache_breakpoints
- 阅读源码块 F:withRetry 529 策略
- 在源码树打开 services/api/claude.ts 对照行号
常见误区
注意
不要把 utils/api.ts 的 toolToAPISchema 与 claude.ts 的 queryModel 混为一谈
注意
非流式 fallback 的 timeout 在 remote 环境默认 120s,本地 300s
注意
getFeatureValue_CACHED_MAY_BE_STALE 读的 cache 可能跨进程陈旧,1h TTL 用 bootstrap state latch 防 mid-session 翻转
在架构中的位置
Claude Code 主 query 循环的数据流可概括为:
query.ts 组装 messages + systemPrompt + tools
→ queryModelWithStreaming (claude.ts)
→ queryModel 内部:normalizeMessagesForAPI、addCacheBreakpoints
→ anthropic.beta.messages.stream
→ 逐 event yield StreamEvent / AssistantMessage
→ updateUsage 累加 token;addToTotalSessionCost 写 cost-tracker
→ 529/429 → withRetry 退避 → 可能 fallback executeNonStreamingRequest
services/api/ 目录还包含 client.ts(Anthropic client 单例)、withRetry.ts、usage.ts、errors.ts 等;本章聚焦 claude.ts 主路径。改 beta header 或 max_tokens 策略时,优先读本文件与 utils/betas.ts。
Prompt Cache:getCacheControl 与 1h TTL
Prompt caching 通过 cache_control: { type: 'ephemeral' } 标记写入点。getCacheControl 根据 scope(global vs 默认)与 querySource 决定是否附加 ttl: '1h'。
should1hCacheTTL 逻辑要点:
- Bedrock 用户可通过
ENABLE_PROMPT_CACHING_1H_BEDROCK绕过 GrowthBook - 1P 用户 eligibility 在 bootstrap state latch(ant 或 subscriber 非 overage),防止 mid-session overage 翻转 bust ~20K token cache
- allowlist 来自 GrowthBook
tengu_prompt_cache_1h_config,支持repl_main_thread*前缀通配 - allowlist 同样 latch 到 session,避免 disk cache 更新导致同一 session 混用 5m/1h TTL
getPromptCachingEnabled 还处理 DISABLE_PROMPT_CACHING_OPUS 等 env 开关。读 cache 相关 bug 时,先查 assistant 上 requestId 链(getPreviousRequestIdFromMessages)与 promptCacheBreakDetection 模块。
源码引用: src/services/api/claude.ts · 第 333–356 行(共 3420 行)
333| export function getPromptCachingEnabled(model: string): boolean {
334| // Global disable takes precedence
335| if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false
336|
337| // Check if we should disable for small/fast model
338| if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) {
339| const smallFastModel = getSmallFastModel()
340| if (model === smallFastModel) return false
341| }
342|
343| // Check if we should disable for default Sonnet
344| if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_SONNET)) {
345| const defaultSonnet = getDefaultSonnetModel()
346| if (model === defaultSonnet) return false
347| }
348|
349| // Check if we should disable for default Opus
350| if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_OPUS)) {
351| const defaultOpus = getDefaultOpusModel()
352| if (model === defaultOpus) return false
353| }
354|
355| return true
356| }
源码引用: src/services/api/claude.ts · 第 358–434 行(共 3420 行)
358| export function getCacheControl({
359| scope,
360| querySource,
361| }: {
362| scope?: CacheScope
363| querySource?: QuerySource
364| } = {}): {
365| type: 'ephemeral'
366| ttl?: '1h'
367| scope?: CacheScope
368| } {
369| return {
370| type: 'ephemeral',
371| ...(should1hCacheTTL(querySource) && { ttl: '1h' }),
372| ...(scope === 'global' && { scope }),
373| }
374| }
375|
376| /**
377| * Determines if 1h TTL should be used for prompt caching.
378| *
379| * Only applied when:
380| * 1. User is eligible (ant or subscriber within rate limits)
381| * 2. The query source matches a pattern in the GrowthBook allowlist
382| *
383| * GrowthBook config shape: { allowlist: string[] }
384| * Patterns support trailing '*' for prefix matching.
385| * Examples:
386| * - { allowlist: ["repl_main_thread*", "sdk"] } — main thread + SDK only
387| * - { allowlist: ["repl_main_thread*", "sdk", "agent:*"] } — also subagents
388| * - { allowlist: ["*"] } — all sources
389| *
390| * The allowlist is cached in STATE for session stability — prevents mixed
391| * TTLs when GrowthBook's disk cache updates mid-request.
392| */
393| function should1hCacheTTL(querySource?: QuerySource): boolean {
394| // 3P Bedrock users get 1h TTL when opted in via env var — they manage their own billing
395| // No GrowthBook gating needed since 3P users don't have GrowthBook configured
396| if (
397| getAPIProvider() === 'bedrock' &&
398| isEnvTruthy(process.env.ENABLE_PROMPT_CACHING_1H_BEDROCK)
399| ) {
400| return true
401| }
402|
403| // Latch eligibility in bootstrap state for session stability — prevents
404| // mid-session overage flips from changing the cache_control TTL, which
405| // would bust the server-side prompt cache (~20K tokens per flip).
406| let userEligible = getPromptCache1hEligible()
407| if (userEligible === null) {
408| userEligible =
409| process.env.USER_TYPE === 'ant' ||
410| (isClaudeAISubscriber() && !currentLimits.isUsingOverage)
411| setPromptCache1hEligible(userEligible)
412| }
413| if (!userEligible) return false
414|
415| // Cache allowlist in bootstrap state for session stability — prevents mixed
416| // TTLs when GrowthBook's disk cache updates mid-request
417| let allowlist = getPromptCache1hAllowlist()
418| if (allowlist === null) {
419| const config = getFeatureValue_CACHED_MAY_BE_STALE<{
420| allowlist?: string[]
421| }>('tengu_prompt_cache_1h_config', {})
422| allowlist = config.allowlist ?? []
423| setPromptCache1hAllowlist(allowlist)
424| }
425|
426| return (
427| querySource !== undefined &&
428| allowlist.some(pattern =>
429| pattern.endsWith('*')
430| ? querySource.startsWith(pattern.slice(0, -1))
431| : querySource === pattern,
432| )
433| )
434| }
queryModelWithStreaming 入口
queryModelWithStreaming 是对外稳定 API:接收 messages、systemPrompt、thinkingConfig、tools、AbortSignal 与 Options,返回 async generator,yield 类型为 StreamEvent | AssistantMessage | SystemAPIErrorMessage。
实现上包一层 withStreamingVCR:测试/录制场景可重放历史流。内部 yield* queryModel(...) 承担实际 HTTP 与 event 解析。
阅读要点:
shouldDeferLspTool在 LSP 未初始化完成时给 tool schema 加defer_loading,避免模型过早调用未就绪工具- 调用方(query.ts)负责在 generator 结束后读取 final usage 并写 session
- SystemAPIErrorMessage 来自 withRetry 的 system yield,UI 可展示「重试中」而不中断 transcript
建议在调试「流式无输出」时,在 queryModel 内 message_start 分支打断点,确认 betas 与 model 字符串是否被 normalizeModelStringForAPI 改写。
源码引用: src/services/api/claude.ts · 第 752–793 行(共 3420 行)
752| export async function* queryModelWithStreaming({
753| messages,
754| systemPrompt,
755| thinkingConfig,
756| tools,
757| signal,
758| options,
759| }: {
760| messages: Message[]
761| systemPrompt: SystemPrompt
762| thinkingConfig: ThinkingConfig
763| tools: Tools
764| signal: AbortSignal
765| options: Options
766| }): AsyncGenerator<
767| StreamEvent | AssistantMessage | SystemAPIErrorMessage,
768| void
769| > {
770| return yield* withStreamingVCR(messages, async function* () {
771| yield* queryModel(
772| messages,
773| systemPrompt,
774| thinkingConfig,
775| tools,
776| signal,
777| options,
778| )
779| })
780| }
781|
782| /**
783| * Determines if an LSP tool should be deferred (tool appears with defer_loading: true)
784| * because LSP initialization is not yet complete.
785| */
786| function shouldDeferLspTool(tool: Tool): boolean {
787| if (!('isLsp' in tool) || !tool.isLsp) {
788| return false
789| }
790| const status = getInitializationStatus()
791| // Defer when pending or not started
792| return status.status === 'pending' || status.status === 'not-started'
793| }
非流式 Fallback:executeNonStreamingRequest
当 streaming 失败或环境强制 non-streaming 时,executeNonStreamingRequest 封装公共模式:
- 调用
withRetry包裹anthropic.beta.messages.create adjustParamsForNonStreaming限制max_tokens不超过MAX_NON_STREAMING_TOKENS(64000)- timeout:
API_TIMEOUT_MS优先;remote 默认 120s,否则 300s - 每次 attempt yield system 消息供 UI;最终 return
BetaMessage
错误路径区分 APIUserAbortError(立即 rethrow,不记 tengu_nonstreaming_fallback_error)与其他错误(打 diag + logEvent('tengu_nonstreaming_fallback_error'),含 originatingRequestId 便于与失败 stream 关联)。
该路径是「用户感知延迟」的兜底:remote container idle-kill ~5min 时,120s timeout 能 surfaced 为 APIConnectionTimeoutError 而非 silent hang。
源码引用: src/services/api/claude.ts · 第 807–917 行(共 3420 行)
807| function getNonstreamingFallbackTimeoutMs(): number {
808| const override = parseInt(process.env.API_TIMEOUT_MS || '', 10)
809| if (override) return override
810| return isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) ? 120_000 : 300_000
811| }
812|
813| /**
814| * Helper generator for non-streaming API requests.
815| * Encapsulates the common pattern of creating a withRetry generator,
816| * iterating to yield system messages, and returning the final BetaMessage.
817| */
818| export async function* executeNonStreamingRequest(
819| clientOptions: {
820| model: string
821| fetchOverride?: Options['fetchOverride']
822| source: string
823| },
824| retryOptions: {
825| model: string
826| fallbackModel?: string
827| thinkingConfig: ThinkingConfig
828| fastMode?: boolean
829| signal: AbortSignal
830| initialConsecutive529Errors?: number
831| querySource?: QuerySource
832| },
833| paramsFromContext: (context: RetryContext) => BetaMessageStreamParams,
834| onAttempt: (attempt: number, start: number, maxOutputTokens: number) => void,
835| captureRequest: (params: BetaMessageStreamParams) => void,
836| /**
837| * Request ID of the failed streaming attempt this fallback is recovering
838| * from. Emitted in tengu_nonstreaming_fallback_error for funnel correlation.
839| */
840| originatingRequestId?: string | null,
841| ): AsyncGenerator<SystemAPIErrorMessage, BetaMessage> {
842| const fallbackTimeoutMs = getNonstreamingFallbackTimeoutMs()
843| const generator = withRetry(
844| () =>
845| getAnthropicClient({
846| maxRetries: 0,
847| model: clientOptions.model,
848| fetchOverride: clientOptions.fetchOverride,
849| source: clientOptions.source,
850| }),
851| async (anthropic, attempt, context) => {
852| const start = Date.now()
853| const retryParams = paramsFromContext(context)
854| captureRequest(retryParams)
855| onAttempt(attempt, start, retryParams.max_tokens)
856|
857| const adjustedParams = adjustParamsForNonStreaming(
858| retryParams,
859| MAX_NON_STREAMING_TOKENS,
860| )
861|
862| try {
863| // biome-ignore lint/plugin: non-streaming API call
864| return await anthropic.beta.messages.create(
865| {
866| ...adjustedParams,
867| model: normalizeModelStringForAPI(adjustedParams.model),
868| },
869| {
870| signal: retryOptions.signal,
871| timeout: fallbackTimeoutMs,
872| },
873| )
874| } catch (err) {
875| // User aborts are not errors — re-throw immediately without logging
876| if (err instanceof APIUserAbortError) throw err
877|
878| // Instrumentation: record when the non-streaming request errors (including
879| // timeouts). Lets us distinguish "fallback hung past container kill"
880| // (no event) from "fallback hit the bounded timeout" (this event).
881| logForDiagnosticsNoPII('error', 'cli_nonstreaming_fallback_error')
882| logEvent('tengu_nonstreaming_fallback_error', {
883| model:
884| clientOptions.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
885| error:
886| err instanceof Error
887| ? (err.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
888| : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
889| attempt,
890| timeout_ms: fallbackTimeoutMs,
891| request_id: (originatingRequestId ??
892| 'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
893| })
894| throw err
895| }
896| },
897| {
898| model: retryOptions.model,
899| fallbackModel: retryOptions.fallbackModel,
900| thinkingConfig: retryOptions.thinkingConfig,
901| ...(isFastModeEnabled() && { fastMode: retryOptions.fastMode }),
902| signal: retryOptions.signal,
903| initialConsecutive529Errors: retryOptions.initialConsecutive529Errors,
904| querySource: retryOptions.querySource,
905| },
906| )
907|
908| let e
909| do {
910| e = await generator.next()
911| if (!e.done && e.value.type === 'system') {
912| yield e.value
913| }
914| } while (!e.done)
915|
916| return e.value as BetaMessage
917| }
Usage 统计:updateUsage 与 accumulateUsage
流式 API 的 usage 字段语义容易踩坑:
updateUsage 处理单条 stream 内的 BetaMessageDeltaUsage:
- input / cache_creation / cache_read 仅在 non-null 且 > 0 时覆盖,防止 message_delta 的 0 抹掉 message_start 值
- output_tokens 用
??保留旧值 cache_deleted_input_tokens(cache editing)在CACHED_MICROCOMPACTfeature 下同样 >0 guard- server_tool_use(web_search / web_fetch)从 delta 合并
accumulateUsage 用于多 assistant turn 求和;service_tier、inference_geo、iterations、speed 取 最近一条 message 的值。
cost-tracker 与 REPL token 指示器依赖这些函数输出。若发现「输出 token 翻倍」,先查是否误对 delta 做 accumulate 而非 update。
源码引用: src/services/api/claude.ts · 第 2914–3038 行(共 3420 行)
2914| /**
2915| * Updates usage statistics with new values from streaming API events.
2916| * Note: Anthropic's streaming API provides cumulative usage totals, not incremental deltas.
2917| * Each event contains the complete usage up to that point in the stream.
2918| *
2919| * Input-related tokens (input_tokens, cache_creation_input_tokens, cache_read_input_tokens)
2920| * are typically set in message_start and remain constant. message_delta events may send
2921| * explicit 0 values for these fields, which should not overwrite the values from message_start.
2922| * We only update these fields if they have a non-null, non-zero value.
2923| */
2924| export function updateUsage(
2925| usage: Readonly<NonNullableUsage>,
2926| partUsage: BetaMessageDeltaUsage | undefined,
2927| ): NonNullableUsage {
2928| if (!partUsage) {
2929| return { ...usage }
2930| }
2931| return {
2932| input_tokens:
2933| partUsage.input_tokens !== null && partUsage.input_tokens > 0
2934| ? partUsage.input_tokens
2935| : usage.input_tokens,
2936| cache_creation_input_tokens:
2937| partUsage.cache_creation_input_tokens !== null &&
2938| partUsage.cache_creation_input_tokens > 0
2939| ? partUsage.cache_creation_input_tokens
2940| : usage.cache_creation_input_tokens,
2941| cache_read_input_tokens:
2942| partUsage.cache_read_input_tokens !== null &&
2943| partUsage.cache_read_input_tokens > 0
2944| ? partUsage.cache_read_input_tokens
2945| : usage.cache_read_input_tokens,
2946| output_tokens: partUsage.output_tokens ?? usage.output_tokens,
2947| server_tool_use: {
2948| web_search_requests:
2949| partUsage.server_tool_use?.web_search_requests ??
2950| usage.server_tool_use.web_search_requests,
2951| web_fetch_requests:
2952| partUsage.server_tool_use?.web_fetch_requests ??
2953| usage.server_tool_use.web_fetch_requests,
2954| },
2955| service_tier: usage.service_tier,
2956| cache_creation: {
2957| // SDK type BetaMessageDeltaUsage is missing cache_creation, but it's real!
2958| ephemeral_1h_input_tokens:
2959| (partUsage as BetaUsage).cache_creation?.ephemeral_1h_input_tokens ??
2960| usage.cache_creation.ephemeral_1h_input_tokens,
2961| ephemeral_5m_input_tokens:
2962| (partUsage as BetaUsage).cache_creation?.ephemeral_5m_input_tokens ??
2963| usage.cache_creation.ephemeral_5m_input_tokens,
2964| },
2965| // cache_deleted_input_tokens: returned by the API when cache editing
2966| // deletes KV cache content, but not in SDK types. Kept off NonNullableUsage
2967| // so the string is eliminated from external builds by dead code elimination.
2968| // Uses the same > 0 guard as other token fields to prevent message_delta
2969| // from overwriting the real value with 0.
2970| ...(feature('CACHED_MICROCOMPACT')
2971| ? {
2972| cache_deleted_input_tokens:
2973| (partUsage as unknown as { cache_deleted_input_tokens?: number })
2974| .cache_deleted_input_tokens != null &&
2975| (partUsage as unknown as { cache_deleted_input_tokens: number })
2976| .cache_deleted_input_tokens > 0
2977| ? (partUsage as unknown as { cache_deleted_input_tokens: number })
2978| .cache_deleted_input_tokens
2979| : ((usage as unknown as { cache_deleted_input_tokens?: number })
2980| .cache_deleted_input_tokens ?? 0),
2981| }
2982| : {}),
2983| inference_geo: usage.inference_geo,
2984| iterations: partUsage.iterations ?? usage.iterations,
2985| speed: (partUsage as BetaUsage).speed ?? usage.speed,
2986| }
2987| }
2988|
2989| /**
2990| * Accumulates usage from one message into a total usage object.
2991| * Used to track cumulative usage across multiple assistant turns.
2992| */
2993| export function accumulateUsage(
2994| totalUsage: Readonly<NonNullableUsage>,
2995| messageUsage: Readonly<NonNullableUsage>,
2996| ): NonNullableUsage {
2997| return {
2998| input_tokens: totalUsage.input_tokens + messageUsage.input_tokens,
2999| cache_creation_input_tokens:
3000| totalUsage.cache_creation_input_tokens +
3001| messageUsage.cache_creation_input_tokens,
3002| cache_read_input_tokens:
3003| totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens,
3004| output_tokens: totalUsage.output_tokens + messageUsage.output_tokens,
3005| server_tool_use: {
3006| web_search_requests:
3007| totalUsage.server_tool_use.web_search_requests +
3008| messageUsage.server_tool_use.web_search_requests,
3009| web_fetch_requests:
3010| totalUsage.server_tool_use.web_fetch_requests +
3011| messageUsage.server_tool_use.web_fetch_requests,
3012| },
3013| service_tier: messageUsage.service_tier, // Use the most recent service tier
3014| cache_creation: {
3015| ephemeral_1h_input_tokens:
3016| totalUsage.cache_creation.ephemeral_1h_input_tokens +
3017| messageUsage.cache_creation.ephemeral_1h_input_tokens,
3018| ephemeral_5m_input_tokens:
3019| totalUsage.cache_creation.ephemeral_5m_input_tokens +
3020| messageUsage.cache_creation.ephemeral_5m_input_tokens,
3021| },
3022| // See comment in updateUsage — field is not on NonNullableUsage to keep
3023| // the string out of external builds.
3024| ...(feature('CACHED_MICROCOMPACT')
3025| ? {
3026| cache_deleted_input_tokens:
3027| ((totalUsage as unknown as { cache_deleted_input_tokens?: number })
3028| .cache_deleted_input_tokens ?? 0) +
3029| ((
3030| messageUsage as unknown as { cache_deleted_input_tokens?: number }
3031| ).cache_deleted_input_tokens ?? 0),
3032| }
3033| : {}),
3034| inference_geo: messageUsage.inference_geo, // Use the most recent
3035| iterations: messageUsage.iterations, // Use the most recent
3036| speed: messageUsage.speed, // Use the most recent
3037| }
3038| }
addCacheBreakpoints 与 cache_edits
addCacheBreakpoints 把 Message[] 转为 API MessageParam[],并在策略允许时插入 cache_control 标记。日志事件 tengu_api_cache_breakpoints 记录 message 数、cachingEnabled、skipCacheWrite。
与 compact/apiMicrocompact 协作时,可传入 useCachedMC、newCacheEdits、pinnedEdits:实现 API 侧 context management(删除 KV cache 片段)而非整段重发。注释强调「每条请求恰好一个 message-level cache_control 标记」,与 Mycro turn-to-turn eviction 对齐。
读此函数时应对照 getAPIContextManagement(compact 子模块)与 GrowthBook tengu_compact_cache_prefix gate。
源码引用: src/services/api/claude.ts · 第 3062–3076 行(共 3420 行)
3062| // Exported for testing cache_reference placement constraints
3063| export function addCacheBreakpoints(
3064| messages: (UserMessage | AssistantMessage)[],
3065| enablePromptCaching: boolean,
3066| querySource?: QuerySource,
3067| useCachedMC = false,
3068| newCacheEdits?: CachedMCEditsBlock | null,
3069| pinnedEdits?: CachedMCPinnedEdits[],
3070| skipCacheWrite = false,
3071| ): MessageParam[] {
3072| logEvent('tengu_api_cache_breakpoints', {
3073| totalMessageCount: messages.length,
3074| cachingEnabled: enablePromptCaching,
3075| skipCacheWrite,
3076| })
withRetry:529、429 与模型 fallback
services/api/withRetry.ts 导出 withRetry async generator,claude.ts 的 stream 与 non-stream 路径均依赖它。
核心常量:
DEFAULT_MAX_RETRIES = 10,BASE_DELAY_MS = 500MAX_529_RETRIES = 3与FOREGROUND_529_RETRY_SOURCES集合CannotRetryError/FallbackTriggeredError触发模型降级
shouldRetry529:querySource === undefined 时保守地 允许 重试(未标记的后台路径);显式列入白名单的 compact、repl_main_thread、agent:* 等才在容量错误时退避。
isPersistentRetryEnabled(ant + CLAUDE_CODE_UNATTENDED_RETRY):429/529 无限重试 + 30s heartbeat SystemAPIErrorMessage,防止 unattended 环境 idle kill。
Fast mode overage、OAuth 401、AWS credential 错误等在 withRetry 内有专门分支,并 logEvent 到 analytics。
源码引用: src/services/api/withRetry.ts · 第 52–89 行(共 823 行)
52| const DEFAULT_MAX_RETRIES = 10
53| const FLOOR_OUTPUT_TOKENS = 3000
54| const MAX_529_RETRIES = 3
55| export const BASE_DELAY_MS = 500
56|
57| // Foreground query sources where the user IS blocking on the result — these
58| // retry on 529. Everything else (summaries, titles, suggestions, classifiers)
59| // bails immediately: during a capacity cascade each retry is 3-10× gateway
60| // amplification, and the user never sees those fail anyway. New sources
61| // default to no-retry — add here only if the user is waiting on the result.
62| const FOREGROUND_529_RETRY_SOURCES = new Set<QuerySource>([
63| 'repl_main_thread',
64| 'repl_main_thread:outputStyle:custom',
65| 'repl_main_thread:outputStyle:Explanatory',
66| 'repl_main_thread:outputStyle:Learning',
67| 'sdk',
68| 'agent:custom',
69| 'agent:default',
70| 'agent:builtin',
71| 'compact',
72| 'hook_agent',
73| 'hook_prompt',
74| 'verification_agent',
75| 'side_question',
76| // Security classifiers — must complete for auto-mode correctness.
77| // yoloClassifier.ts uses 'auto_mode' (not 'yolo_classifier' — that's
78| // type-only). bash_classifier is ant-only; feature-gate so the string
79| // tree-shakes out of external builds (excluded-strings.txt).
80| 'auto_mode',
81| ...(feature('BASH_CLASSIFIER') ? (['bash_classifier'] as const) : []),
82| ])
83|
84| function shouldRetry529(querySource: QuerySource | undefined): boolean {
85| // undefined → retry (conservative for untagged call paths)
86| return (
87| querySource === undefined || FOREGROUND_529_RETRY_SOURCES.has(querySource)
88| )
89| }
源码引用: src/services/api/withRetry.ts · 第 170–220 行(共 823 行)
170| export async function* withRetry<T>(
171| getClient: () => Promise<Anthropic>,
172| operation: (
173| client: Anthropic,
174| attempt: number,
175| context: RetryContext,
176| ) => Promise<T>,
177| options: RetryOptions,
178| ): AsyncGenerator<SystemAPIErrorMessage, T> {
179| const maxRetries = getMaxRetries(options)
180| const retryContext: RetryContext = {
181| model: options.model,
182| thinkingConfig: options.thinkingConfig,
183| ...(isFastModeEnabled() && { fastMode: options.fastMode }),
184| }
185| let client: Anthropic | null = null
186| let consecutive529Errors = options.initialConsecutive529Errors ?? 0
187| let lastError: unknown
188| let persistentAttempt = 0
189| for (let attempt = 1; attempt <= maxRetries + 1; attempt++) {
190| if (options.signal?.aborted) {
191| throw new APIUserAbortError()
192| }
193|
194| // Capture whether fast mode is active before this attempt
195| // (fallback may change the state mid-loop)
196| const wasFastModeActive = isFastModeEnabled()
197| ? retryContext.fastMode && !isFastModeCooldown()
198| : false
199|
200| try {
201| // Check for mock rate limits (used by /mock-limits command for Ant employees)
202| if (process.env.USER_TYPE === 'ant') {
203| const mockError = checkMockRateLimitError(
204| retryContext.model,
205| wasFastModeActive,
206| )
207| if (mockError) {
208| throw mockError
209| }
210| }
211|
212| // Get a fresh client instance on first attempt or after authentication errors
213| // - 401 for first-party API authentication failures
214| // - 403 "OAuth token has been revoked" (another process refreshed the token)
215| // - Bedrock-specific auth errors (403 or CredentialsProviderError)
216| // - Vertex-specific auth errors (credential refresh failures, 401)
217| // - ECONNRESET/EPIPE: stale keep-alive socket; disable pooling and reconnect
218| const isStaleConnection = isStaleConnectionError(lastError)
219| if (
220| isStaleConnection &&
媒体限制与 requestId 链
stripExcessMediaItems 保证单次请求 image+document 不超过 API_MAX_MEDIA_PER_REQUEST:从最早媒体开始剥离,保留最近附件,避免模型看不到用户刚粘贴的截图。
getPreviousRequestIdFromMessages 从 messages 数组尾部找最近 assistant 的 requestId,用于 analytics 链接 consecutive requests(cache hit rate、incremental token)。注释强调比全局 state 更可靠:subagent/teammate 各自 chain,rollback 自然更新。
这两函数体现 claude.ts「发 API 前最后一道卫生检查」的角色,与 utils/messages 的 normalize 互补。
源码引用: src/services/api/claude.ts · 第 919–973 行(共 3420 行)
919| /**
920| * Extracts the request ID from the most recent assistant message in the
921| * conversation. Used to link consecutive API requests in analytics so we can
922| * join them for cache-hit-rate analysis and incremental token tracking.
923| *
924| * Deriving this from the message array (rather than global state) ensures each
925| * query chain (main thread, subagent, teammate) tracks its own request chain
926| * independently, and rollback/undo naturally updates the value.
927| */
928| function getPreviousRequestIdFromMessages(
929| messages: Message[],
930| ): string | undefined {
931| for (let i = messages.length - 1; i >= 0; i--) {
932| const msg = messages[i]!
933| if (msg.type === 'assistant' && msg.requestId) {
934| return msg.requestId
935| }
936| }
937| return undefined
938| }
939|
940| function isMedia(
941| block: BetaContentBlockParam,
942| ): block is BetaImageBlockParam | BetaRequestDocumentBlock {
943| return block.type === 'image' || block.type === 'document'
944| }
945|
946| function isToolResult(
947| block: BetaContentBlockParam,
948| ): block is BetaToolResultBlockParam {
949| return block.type === 'tool_result'
950| }
951|
952| /**
953| * Ensures messages contain at most `limit` media items (images + documents).
954| * Strips oldest media first to preserve the most recent.
955| */
956| export function stripExcessMediaItems(
957| messages: (UserMessage | AssistantMessage)[],
958| limit: number,
959| ): (UserMessage | AssistantMessage)[] {
960| let toRemove = 0
961| for (const msg of messages) {
962| if (!Array.isArray(msg.message.content)) continue
963| for (const block of msg.message.content) {
964| if (isMedia(block)) toRemove++
965| if (isToolResult(block) && Array.isArray(block.content)) {
966| for (const nested of block.content) {
967| if (isMedia(nested)) toRemove++
968| }
969| }
970| }
971| }
972| toRemove -= limit
973| if (toRemove <= 0) return messages
源码目录与关联文件
强关联:services/api/withRetry.ts、services/api/client.ts、services/compact/apiMicrocompact.ts、utils/messages.ts(normalizeMessagesForAPI)、query.ts(调用 queryModelWithStreaming)。点击 claude.ts 跳回本章源码块。
动手练习
- 在 REPL 触发一次 529(或 mock rate limit),观察 system 重试消息与
tengu_query_error事件 - 对比同一 session 两轮请求的 cache_read_input_tokens,确认 1h TTL allowlist 是否 latch
- 阅读
cleanupStream,理解 abort 后为何必须 abort stream.controller - 用
getMaxOutputTokensForModel对照 compact 章的MAX_OUTPUT_TOKENS_FOR_SUMMARY预留逻辑
本章小结与延伸
claude.ts = 模型请求的编排中心。下一章建议 mcp-client,理解工具列表如何进入同一 params.tools 数组。 继续学习: