Claude Code 源码分析Claude Code 源码分析
首页
源码统计
系统架构
UML 图表
工具系统
CodeGraph
首页
源码统计
系统架构
UML 图表
工具系统
CodeGraph
  • 概览

    • Claude Code 源码分析
    • 源码统计
    • CodeGraph 图谱
  • 架构

    • 系统架构
    • UML 图表索引
    • 查询引擎
    • 核心流程
    • 消息系统
    • 状态管理
  • 功能模块

    • 工具系统
    • 斜杠命令
    • 服务层
    • MCP 协议
    • Skills 技能
    • 子代理系统
  • 分层深度

    • 入口层
    • UI / Ink 层
    • utils 基础设施
    • 桥接 / 远程
    • 上下文压缩
  • 原理与安全

    • 底层原理
    • 技术难点
    • 权限与安全
    • 内部机制
    • 遥测与分析
  • 深度专题

    • Hooks 系统
    • 插件系统
    • 记忆系统
    • API 通信层
    • Ink 终端 UI
    • 认证系统
    • 构建与发布
    • 术语表
  • 调用分析

    • 调用链分析
    • 核心文件索引
  • 模块详解

    • utils

      • 模块: utils
      • messages · 消息工厂与规范化
      • session-storage · JSONL 会话持久化
      • permissions · 工具权限决策
      • shell-hooks · 用户 Shell Hook 系统
    • components

      • 模块: components
      • REPL · 主屏编排
      • messages · 消息行渲染
      • PermissionRequest · 权限弹窗
      • PromptInput · 底部输入
    • services

      • 模块: services
      • api-claude · Anthropic API 流式与重试
      • mcp-client · MCP 连接与工具调用
      • compact · 上下文压缩与自动触发
      • analytics · GrowthBook、Datadog 与 1P 事件
    • tools

      • 模块: tools
      • tool-interface · Tool 契约与注册表
      • bash-tool · Shell 执行与权限
      • streaming-executor · 流式工具并发调度
      • agent-tool · 子 Agent 委派
    • commands

      • 模块: commands
      • command-registry · commands.ts 注册与分派
      • model-command · /model 模型选择
      • mcp-commands · /mcp 服务器管理
      • compact-memory-commands · /compact 与 /memory
    • ink

      • 模块: ink
      • Ink 渲染管线 · Screen 与终端输出
      • 终端事件 · resize、paste、stdin
      • Ink Hooks · 输入、搜索、终端状态
      • Ink 组件 · Box、Text、ScrollBox 原语
    • hooks

      • 模块: hooks
      • useCanUseTool · 权限 UI 接缝
      • 输入与快捷键 Hook
      • 合并态 Hook(MCP + 本地)
      • notifs 通知 Hook
    • bridge

      • 模块: bridge
      • repl-bridge · REPL 桥初始化与传输
      • bridge-messaging · 桥消息路由与入站处理
      • remote-bridge-core · env-less 核心与守护主循环
      • bridge-permissions-ui · 权限、API 与 TUI
    • cli

      • 模块: cli
      • Structured IO · NDJSON SDK 协议
      • CLI Transports · Session Ingress 传输层
      • CLI Handlers · 子命令懒加载实现
      • Update & Upload · 自更新与串行上传原语
    • screens

      • 模块: screens
      • REPL 屏 · Screen 类型与顶层路由
      • ResumeConversation · 会话恢复选择器
      • Doctor · 安装诊断全屏
    • entrypoints

      • 模块: entrypoints
      • cli-entrypoint · Bootstrap 与快路径
      • sdk-types · core / control / runtime 类型体系
      • mcp-entrypoint · MCP stdio 服务器
      • sandbox-types · 沙箱配置单一真相源
    • skills

      • 模块: skills
      • skills-loading · 磁盘加载与 bundled 注册表
      • bundled-skills · 内置 skill 与 initBundledSkills
      • mcp-skills · MCP prompt 转 skill
      • skill-tool-integration · SkillTool 与命令注册
    • types

      • 模块: types
      • message-types · Message 联合与 content blocks
      • tool-permission-types · Tool、Permission、Command 类型
      • api-sdk-types · API 与 Hooks 协议类型
      • misc-types · ids、plugin、generated 与其余类型
    • tasks

      • 模块: tasks
      • local-agent-task · 本地 Agent 与主会话后台化
      • remote-agent-task · 远程 CCR 与 In-Process Teammate
      • shell-workflow-tasks · Bash 后台、Workflow 与 stopTask
      • dream-monitor-tasks · Dream、Monitor MCP 与 pill 文案
    • keybindings

      • 模块: keybindings
      • keybinding-registry · 注册、Provider 与 useKeybinding
      • default-bindings · 默认键位表与平台差异
      • command-bindings · command:* 动态斜杠命令绑定
      • vim-bindings · Vim 模式与 keybindings 边界
    • memdir

      • 模块: memdir
      • memdir-core · 路径、加载与 MEMORY.md
      • memory-extraction · extractMemories 与 SessionMemory
      • memdir-commands · /memory、/remember 与命令集成
    • state

      • 模块: state
      • app-state-core · store、AppState 类型与 Provider
      • app-state-selectors · selectors 与 onChangeAppState
      • teammate-state · 队友视图与 swarm 状态
      • state-boundaries · bootstrap、sessionStorage、FileStateCache
    • query

      • 模块: query
      • query config 与 deps · 配置快照与依赖注入
      • query tokenBudget · +500k 自动续跑
      • query transitions · Continue / Terminal 状态机
      • query stopHooks · Stop 事件与 turn 结束编排
  • 模块详解(扩展)

    • messages · 消息工厂与规范化
    • session-storage · JSONL 会话持久化
    • permissions · 工具权限决策
    • shell-hooks · 用户 Shell Hook 系统
    • REPL · 主屏编排
    • messages · 消息行渲染
    • PermissionRequest · 权限弹窗
    • PromptInput · 底部输入
    • api-claude · Anthropic API 流式与重试
    • mcp-client · MCP 连接与工具调用
    • compact · 上下文压缩与自动触发
    • analytics · GrowthBook、Datadog 与 1P 事件
    • tool-interface · Tool 契约与注册表
    • bash-tool · Shell 执行与权限
    • streaming-executor · 流式工具并发调度
    • agent-tool · 子 Agent 委派
    • command-registry · commands.ts 注册与分派
    • model-command · /model 模型选择
    • mcp-commands · /mcp 服务器管理
    • compact-memory-commands · /compact 与 /memory
    • Ink 渲染管线 · Screen 与终端输出
    • 终端事件 · resize、paste、stdin
    • Ink Hooks · 输入、搜索、终端状态
    • Ink 组件 · Box、Text、ScrollBox 原语
    • useCanUseTool · 权限 UI 接缝
    • 输入与快捷键 Hook
    • 合并态 Hook(MCP + 本地)
    • notifs 通知 Hook
    • repl-bridge · REPL 桥初始化与传输
    • bridge-messaging · 桥消息路由与入站处理
    • remote-bridge-core · env-less 核心与守护主循环
    • bridge-permissions-ui · 权限、API 与 TUI
    • Structured IO · NDJSON SDK 协议
    • CLI Transports · Session Ingress 传输层
    • CLI Handlers · 子命令懒加载实现
    • Update & Upload · 自更新与串行上传原语
    • REPL 屏 · Screen 类型与顶层路由
    • ResumeConversation · 会话恢复选择器
    • Doctor · 安装诊断全屏
    • cli-entrypoint · Bootstrap 与快路径
    • sdk-types · core / control / runtime 类型体系
    • mcp-entrypoint · MCP stdio 服务器
    • sandbox-types · 沙箱配置单一真相源
    • skills-loading · 磁盘加载与 bundled 注册表
    • bundled-skills · 内置 skill 与 initBundledSkills
    • mcp-skills · MCP prompt 转 skill
    • skill-tool-integration · SkillTool 与命令注册
    • message-types · Message 联合与 content blocks
    • tool-permission-types · Tool、Permission、Command 类型
    • api-sdk-types · API 与 Hooks 协议类型
    • misc-types · ids、plugin、generated 与其余类型
    • local-agent-task · 本地 Agent 与主会话后台化
    • remote-agent-task · 远程 CCR 与 In-Process Teammate
    • shell-workflow-tasks · Bash 后台、Workflow 与 stopTask
    • dream-monitor-tasks · Dream、Monitor MCP 与 pill 文案
    • keybinding-registry · 注册、Provider 与 useKeybinding
    • default-bindings · 默认键位表与平台差异
    • command-bindings · command:* 动态斜杠命令绑定
    • vim-bindings · Vim 模式与 keybindings 边界
    • memdir-core · 路径、加载与 MEMORY.md
    • memory-extraction · extractMemories 与 SessionMemory
    • memdir-commands · /memory、/remember 与命令集成
    • app-state-core · store、AppState 类型与 Provider
    • app-state-selectors · selectors 与 onChangeAppState
    • teammate-state · 队友视图与 swarm 状态
    • state-boundaries · bootstrap、sessionStorage、FileStateCache
    • query config 与 deps · 配置快照与依赖注入
    • query tokenBudget · +500k 自动续跑
    • query transitions · Continue / Terminal 状态机
    • query stopHooks · Stop 事件与 turn 结束编排
  • 工具详解

    • tool-interface · Tool 契约与注册表
    • tool-permission-types · Tool、Permission、Command 类型
    • 工具: Bash
    • 工具: PowerShell
    • 工具: Agent
    • 工具: LSP
    • 工具: FileEdit
    • 工具: FileRead
    • 工具: Skill
    • 工具: WebFetch
    • 工具: MCP
    • 工具: SendMessage
    • 工具: FileWrite
    • 工具: Config
    • 工具: Grep
    • 工具: Brief
    • 工具: ExitPlanMode
    • 工具: ToolSearch
    • 工具: NotebookEdit
    • 工具: TaskOutput
    • 工具: WebSearch
    • 工具: ScheduleCron

本章总览

cli/transports/ 实现 Claude Code worker 与 Session Ingress / CCR v2 之间的双向通道:读侧 WebSocket 或 SSE,写侧 WS 或 HTTP POST。getTransportForUrl 按环境变量选型;HybridTransport 与 CCRClient 共用 SerialBatchEventUploader 做串行批量 POST;WorkerStateUploader 负责 PUT /worker 状态合并。本章要求你能解释为何 bridge 必须 serialize POST,以及 CCR v2 为何强制 SSETransport。

学完本章你应该能

  • 列出 transportUtils 的三级选型优先级
  • 说明 WebSocketTransport 重连、ping 与 messageBuffer 回放
  • 解释 HybridTransport 的 stream_event 100ms 批与 fire-and-forget 写
  • 描述 SSETransport 的 parseSSEFrames 与 Last-Event-ID 续传
  • 理解 CCRClient 的 heartbeat、text_delta 合并与 internal events

核心概念(先读懂这些)

读写在 v2 架构上解耦

Session Ingress v1 默认 WebSocketTransport:读写同一条 WS。v2 引入 Hybrid(WS 读 + POST 写)与 CCR(SSE 读 + POST 写 + CCRClient 协议)。解耦是为绕开代理 idle timeout、Firestore 写冲突、以及 server 侧 event stream 语义(SSE client_event 帧)。

SerialBatchEventUploader 是共享原语

HybridTransport 与 CCRClient 都把 outbound 事件 enqueue 到 SerialBatchEventUploader:至多一个 POST in-flight,失败指数退避,可选 maxConsecutiveFailures 丢批。stream_event 常先进入 100ms delay buffer,减少 content delta POST 风暴;非 stream 写会先 flush buffer 保序。

永久错误 vs 可重试断开

WebSocket PERMANENT_CLOSE_CODES(1002, 4001, 4003)与 SSE PERMANENT_HTTP_CODES(401, 403, 404)立即转 closed,不重连。其他断开走指数退避,RECONNECT_GIVE_UP_MS 约 10 分钟预算。睡眠检测(gap > 60s)重置 reconnect budget,由 server 用 4001 告知 session reaped。

建议学习步骤

  1. 阅读源码块 A:Transport 接口与 getTransportForUrl
  2. 阅读源码块 B:WebSocketTransport 状态机与重连
  3. 阅读源码块 C:HybridTransport 写路径注释
  4. 阅读源码块 D:SSETransport parseSSEFrames
  5. 阅读源码块 E:CCRClient 初始化与 stream 合并
  6. 阅读源码块 F:SerialBatchEventUploader drain

常见误区

注意

CCR v2 必须在 connect 前 new CCRClient——RemoteIO 注释强调顺序

注意

HybridTransport bridge 用 void write(),backpressure 暂不生效;maxQueueSize 只是内存 bound

注意

SSE initialSequenceNum 缺失会导致全量 replay session history

Transport 选型总览

getTransportForUrl(url, headers, sessionId, refreshHeaders)
  │
  ├─ CLAUDE_CODE_USE_CCR_V2 → SSETransport
  │     url.pathname + '/worker/events/stream'
  │     + CCRClient (RemoteIO 内)
  │
  ├─ ws/wss + CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2 → HybridTransport
  │
  └─ ws/wss (default) → WebSocketTransport

refreshHeaders 回调让 transport 重连时重新读 session ingress token(父进程刷新 token 文件后 worker 可 pick up)。

仅 ws/wss 支持;http URL 抛 Unsupported protocol(CCR v2 会把 wss→https 再拼 SSE path)。

源码引用: src/cli/transports/Transport.ts · 第 1–7 行(共 8 行)

   1| export interface Transport {
   2|   connect?(): Promise<void>
   3|   close?(): void | Promise<void>
   4|   send?(data: string): Promise<void>
   5|   onData?(handler: (data: string) => void): void
   6|   onClose?(handler: (closeCode?: number) => void): void
   7| }

源码引用: src/cli/transports/transportUtils.ts · 第 8–45 行(共 46 行)

   8| /**
   9|  * Helper function to get the appropriate transport for a URL.
  10|  *
  11|  * Transport selection priority:
  12|  * 1. SSETransport (SSE reads + POST writes) when CLAUDE_CODE_USE_CCR_V2 is set
  13|  * 2. HybridTransport (WS reads + POST writes) when CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2 is set
  14|  * 3. WebSocketTransport (WS reads + WS writes) — default
  15|  */
  16| export function getTransportForUrl(
  17|   url: URL,
  18|   headers: Record<string, string> = {},
  19|   sessionId?: string,
  20|   refreshHeaders?: () => Record<string, string>,
  21| ): Transport {
  22|   if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
  23|     // v2: SSE for reads, HTTP POST for writes
  24|     // --sdk-url is the session URL (.../sessions/{id});
  25|     // derive the SSE stream URL by appending /worker/events/stream
  26|     const sseUrl = new URL(url.href)
  27|     if (sseUrl.protocol === 'wss:') {
  28|       sseUrl.protocol = 'https:'
  29|     } else if (sseUrl.protocol === 'ws:') {
  30|       sseUrl.protocol = 'http:'
  31|     }
  32|     sseUrl.pathname =
  33|       sseUrl.pathname.replace(/\/$/, '') + '/worker/events/stream'
  34|     return new SSETransport(sseUrl, headers, sessionId, refreshHeaders)
  35|   }
  36| 
  37|   if (url.protocol === 'ws:' || url.protocol === 'wss:') {
  38|     if (isEnvTruthy(process.env.CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2)) {
  39|       return new HybridTransport(url, headers, sessionId, refreshHeaders)
  40|     }
  41|     return new WebSocketTransport(url, headers, sessionId, refreshHeaders)
  42|   } else {
  43|     throw new Error(`Unsupported protocol: ${url.protocol}`)
  44|   }
  45| }

WebSocketTransport:连接、缓冲与重连

WebSocketTransport 是默认全双工实现:

  • 状态机:idle → reconnecting → connected → closing → closed
  • CircularBuffer<StdoutMessage>(默认 1000)在断线期间缓存 outbound,重连后 replay
  • pingInterval + pongReceived 检测半开连接;keepAliveInterval 发 JSON keep_alive 数据帧重置代理 idle 计时(lastActivityTime 不含 ping/pong)
  • PERMANENT_CLOSE_CODES 不重连;SLEEP_DETECTION_THRESHOLD_MS 检测系统睡眠
  • 支持 Bun 原生 WS 与 ws 包;refreshHeaders 在 reconnect 时合并

WebSocketTransportOptions:autoReconnect(bridge poll loop 可关)、isBridge(控制 tengu_ws_transport_* 遥测)。

RemoteIO 构造时 sessionId 传入,用于 analytics 与 activity callback。

源码引用: src/cli/transports/WebSocketTransport.ts · 第 42–58 行(共 801 行)

  42| const PERMANENT_CLOSE_CODES = new Set([
  43|   1002, // protocol error — server rejected handshake (e.g. session reaped)
  44|   4001, // session expired / not found
  45|   4003, // unauthorized
  46| ])
  47| 
  48| export type WebSocketTransportOptions = {
  49|   /** When false, the transport does not attempt automatic reconnection on
  50|    *  disconnect. Use this when the caller has its own recovery mechanism
  51|    *  (e.g. the REPL bridge poll loop). Defaults to true. */
  52|   autoReconnect?: boolean
  53|   /** Gates the tengu_ws_transport_* telemetry events. Set true at the
  54|    *  REPL-bridge construction site so only Remote Control sessions (the
  55|    *  Cloudflare-idle-timeout population) emit; print-mode workers stay
  56|    *  silent. Defaults to false. */
  57|   isBridge?: boolean
  58| }

源码引用: src/cli/transports/WebSocketTransport.ts · 第 74–133 行(共 801 行)

  74| export class WebSocketTransport implements Transport {
  75|   private ws: WebSocketLike | null = null
  76|   private lastSentId: string | null = null
  77|   protected url: URL
  78|   protected state: WebSocketTransportState = 'idle'
  79|   protected onData?: (data: string) => void
  80|   private onCloseCallback?: (closeCode?: number) => void
  81|   private onConnectCallback?: () => void
  82|   private headers: Record<string, string>
  83|   private sessionId?: string
  84|   private autoReconnect: boolean
  85|   private isBridge: boolean
  86| 
  87|   // Reconnection state
  88|   private reconnectAttempts = 0
  89|   private reconnectStartTime: number | null = null
  90|   private reconnectTimer: NodeJS.Timeout | null = null
  91|   private lastReconnectAttemptTime: number | null = null
  92|   // Wall-clock of last WS data-frame activity (inbound message or outbound
  93|   // ws.send). Used to compute idle time at close — the signal for diagnosing
  94|   // proxy idle-timeout RSTs (e.g. Cloudflare 5-min). Excludes ping/pong
  95|   // control frames (proxies don't count those).
  96|   private lastActivityTime = 0
  97| 
  98|   // Ping interval for connection health checks
  99|   private pingInterval: NodeJS.Timeout | null = null
 100|   private pongReceived = true
 101| 
 102|   // Periodic keep_alive data frames to reset proxy idle timers
 103|   private keepAliveInterval: NodeJS.Timeout | null = null
 104| 
 105|   // Message buffering for replay on reconnection
 106|   private messageBuffer: CircularBuffer<StdoutMessage>
 107|   // Track which runtime's WS we're using so we can detach listeners
 108|   // with the matching API (removeEventListener vs. off).
 109|   private isBunWs = false
 110| 
 111|   // Captured at connect() time for handleOpenEvent timing. Stored as an
 112|   // instance field so the onOpen handler can be a stable class-property
 113|   // arrow function (removable in doDisconnect) instead of a closure over
 114|   // a local variable.
 115|   private connectStartTime = 0
 116| 
 117|   private refreshHeaders?: () => Record<string, string>
 118| 
 119|   constructor(
 120|     url: URL,
 121|     headers: Record<string, string> = {},
 122|     sessionId?: string,
 123|     refreshHeaders?: () => Record<string, string>,
 124|     options?: WebSocketTransportOptions,
 125|   ) {
 126|     this.url = url
 127|     this.headers = headers
 128|     this.sessionId = sessionId
 129|     this.refreshHeaders = refreshHeaders
 130|     this.autoReconnect = options?.autoReconnect ?? true
 131|     this.isBridge = options?.isBridge ?? false
 132|     this.messageBuffer = new CircularBuffer(DEFAULT_MAX_BUFFER_SIZE)
 133|   }

源码引用: src/cli/transports/WebSocketTransport.ts · 第 135–149 行(共 801 行)

 135|   public async connect(): Promise<void> {
 136|     if (this.state !== 'idle' && this.state !== 'reconnecting') {
 137|       logForDebugging(
 138|         `WebSocketTransport: Cannot connect, current state is ${this.state}`,
 139|         { level: 'error' },
 140|       )
 141|       logForDiagnosticsNoPII('error', 'cli_websocket_connect_failed')
 142|       return
 143|     }
 144|     this.state = 'reconnecting'
 145| 
 146|     this.connectStartTime = Date.now()
 147|     logForDebugging(`WebSocketTransport: Opening ${this.url.href}`)
 148|     logForDiagnosticsNoPII('info', 'cli_websocket_connect_opening')
 149| 

HybridTransport:WS 读 + POST 写

HybridTransport extends WebSocketTransport,写路径完全走 HTTP POST:

设计动机(文件头注释):

  • bridge 用 void transport.write() fire-and-forget
  • 并发 POST → 同一 Firestore 文档并发写 → 碰撞 → retry storm

写流程:

  1. stream_event 进 streamEventBuffer,100ms timer 或遇 non-stream 写时 flush
  2. flush 后 SerialBatchEventUploader.enqueue
  3. uploader 串行 postOnce,失败无限重试(replBridge 可设 maxConsecutiveFailures)

postUrl 由 convertWsUrlToPostUrl 从 WS URL 推导。POST_TIMEOUT_MS=15s 防止单 POST 卡死队列。close() 有 CLOSE_GRACE_MS=3s 尽力 drain。

这与 CCR v2 的 POST 模式同族,但 Hybrid 仍用 WS 读侧。

源码引用: src/cli/transports/HybridTransport.ts · 第 24–53 行(共 283 行)

  24| /**
  25|  * Hybrid transport: WebSocket for reads, HTTP POST for writes.
  26|  *
  27|  * Write flow:
  28|  *
  29|  *   write(stream_event) ─┐
  30|  *                        │ (100ms timer)
  31|  *                        │
  32|  *                        ▼
  33|  *   write(other) ────► uploader.enqueue()  (SerialBatchEventUploader)
  34|  *                        ▲    │
  35|  *   writeBatch() ────────┘    │ serial, batched, retries indefinitely,
  36|  *                             │ backpressure at maxQueueSize
  37|  *                             ▼
  38|  *                        postOnce()  (single HTTP POST, throws on retryable)
  39|  *
  40|  * stream_event messages accumulate in streamEventBuffer for up to 100ms
  41|  * before enqueue (reduces POST count for high-volume content deltas). A
  42|  * non-stream write flushes any buffered stream_events first to preserve order.
  43|  *
  44|  * Serialization + retry + backpressure are delegated to SerialBatchEventUploader
  45|  * (same primitive CCR uses). At most one POST in-flight; events arriving during
  46|  * a POST batch into the next one. On failure, the uploader re-queues and retries
  47|  * with exponential backoff + jitter. If the queue fills past maxQueueSize,
  48|  * enqueue() blocks — giving awaiting callers backpressure.
  49|  *
  50|  * Why serialize? Bridge mode fires writes via `void transport.write()`
  51|  * (fire-and-forget). Without this, concurrent POSTs → concurrent Firestore
  52|  * writes to the same document → collisions → retry storms → pages oncall.
  53|  */

源码引用: src/cli/transports/HybridTransport.ts · 第 54–100 行(共 283 行)

  54| export class HybridTransport extends WebSocketTransport {
  55|   private postUrl: string
  56|   private uploader: SerialBatchEventUploader<StdoutMessage>
  57| 
  58|   // stream_event delay buffer — accumulates content deltas for up to
  59|   // BATCH_FLUSH_INTERVAL_MS before enqueueing (reduces POST count)
  60|   private streamEventBuffer: StdoutMessage[] = []
  61|   private streamEventTimer: ReturnType<typeof setTimeout> | null = null
  62| 
  63|   constructor(
  64|     url: URL,
  65|     headers: Record<string, string> = {},
  66|     sessionId?: string,
  67|     refreshHeaders?: () => Record<string, string>,
  68|     options?: WebSocketTransportOptions & {
  69|       maxConsecutiveFailures?: number
  70|       onBatchDropped?: (batchSize: number, failures: number) => void
  71|     },
  72|   ) {
  73|     super(url, headers, sessionId, refreshHeaders, options)
  74|     const { maxConsecutiveFailures, onBatchDropped } = options ?? {}
  75|     this.postUrl = convertWsUrlToPostUrl(url)
  76|     this.uploader = new SerialBatchEventUploader<StdoutMessage>({
  77|       // Large cap — session-ingress accepts arbitrary batch sizes. Events
  78|       // naturally batch during in-flight POSTs; this just bounds the payload.
  79|       maxBatchSize: 500,
  80|       // Bridge callers use `void transport.write()` — backpressure doesn't
  81|       // apply (they don't await). A batch >maxQueueSize deadlocks (see
  82|       // SerialBatchEventUploader backpressure check). So set it high enough
  83|       // to be a memory bound only. Wire real backpressure in a follow-up
  84|       // once callers await.
  85|       maxQueueSize: 100_000,
  86|       baseDelayMs: 500,
  87|       maxDelayMs: 8000,
  88|       jitterMs: 1000,
  89|       // Optional cap so a persistently-failing server can't pin the drain
  90|       // loop for the lifetime of the process. Undefined = indefinite retry.
  91|       // replBridge sets this; the 1P transportUtils path does not.
  92|       maxConsecutiveFailures,
  93|       onBatchDropped: (batchSize, failures) => {
  94|         logForDiagnosticsNoPII(
  95|           'error',
  96|           'cli_hybrid_batch_dropped_max_failures',
  97|           {
  98|             batchSize,
  99|             failures,
 100|           },

SSETransport:SSE 帧解析与 client_event

SSETransport 用于 CCR v2 读路径:

  • parseSSEFrames:按 \n\n 切 frame,解析 event/id/data 字段;comment 行(:keepalive)重置 liveness
  • 只处理 event: client_event,payload 为 StreamClientEvent proto JSON
  • lastSequenceNum + seenSequenceNums 去重;getLastSequenceNum() 供 replBridge 换 transport 时 resume
  • initialSequenceNum 构造参数避免从 sequence 0 全量 replay
  • POST 写路径与 Hybrid 类似(axios + retry);liveness 45s 无数据则重连
  • PERMANENT_HTTP_CODES 401/403/404 立即 closed

convertSSEUrlToPostUrl / convertWsUrlToPostUrl 对称推导 POST endpoint。

源码引用: src/cli/transports/SSETransport.ts · 第 58–116 行(共 712 行)

  58| export function parseSSEFrames(buffer: string): {
  59|   frames: SSEFrame[]
  60|   remaining: string
  61| } {
  62|   const frames: SSEFrame[] = []
  63|   let pos = 0
  64| 
  65|   // SSE frames are delimited by double newlines
  66|   let idx: number
  67|   while ((idx = buffer.indexOf('\n\n', pos)) !== -1) {
  68|     const rawFrame = buffer.slice(pos, idx)
  69|     pos = idx + 2
  70| 
  71|     // Skip empty frames
  72|     if (!rawFrame.trim()) continue
  73| 
  74|     const frame: SSEFrame = {}
  75|     let isComment = false
  76| 
  77|     for (const line of rawFrame.split('\n')) {
  78|       if (line.startsWith(':')) {
  79|         // SSE comment (e.g., `:keepalive`)
  80|         isComment = true
  81|         continue
  82|       }
  83| 
  84|       const colonIdx = line.indexOf(':')
  85|       if (colonIdx === -1) continue
  86| 
  87|       const field = line.slice(0, colonIdx)
  88|       // Per SSE spec, strip one leading space after colon if present
  89|       const value =
  90|         line[colonIdx + 1] === ' '
  91|           ? line.slice(colonIdx + 2)
  92|           : line.slice(colonIdx + 1)
  93| 
  94|       switch (field) {
  95|         case 'event':
  96|           frame.event = value
  97|           break
  98|         case 'id':
  99|           frame.id = value
 100|           break
 101|         case 'data':
 102|           // Per SSE spec, multiple data: lines are concatenated with \n
 103|           frame.data = frame.data ? frame.data + '\n' + value : value
 104|           break
 105|         // Ignore other fields (retry:, etc.)
 106|       }
 107|     }
 108| 
 109|     // Only emit frames that have data (or are pure comments which reset liveness)
 110|     if (frame.data || isComment) {
 111|       frames.push(frame)
 112|     }
 113|   }
 114| 
 115|   return { frames, remaining: buffer.slice(pos) }
 116| }

源码引用: src/cli/transports/SSETransport.ts · 第 149–219 行(共 712 行)

 149| /**
 150|  * Transport that uses SSE for reading and HTTP POST for writing.
 151|  *
 152|  * Reads events via Server-Sent Events from the CCR v2 event stream endpoint.
 153|  * Writes events via HTTP POST with retry logic (same pattern as HybridTransport).
 154|  *
 155|  * Each `event: client_event` frame carries a StreamClientEvent proto JSON
 156|  * directly in `data:`. The transport extracts `payload` and passes it to
 157|  * `onData` as newline-delimited JSON for StructuredIO consumers.
 158|  *
 159|  * Supports automatic reconnection with exponential backoff and Last-Event-ID
 160|  * for resumption after disconnection.
 161|  */
 162| export class SSETransport implements Transport {
 163|   private state: SSETransportState = 'idle'
 164|   private onData?: (data: string) => void
 165|   private onCloseCallback?: (closeCode?: number) => void
 166|   private onEventCallback?: (event: StreamClientEvent) => void
 167|   private headers: Record<string, string>
 168|   private sessionId?: string
 169|   private refreshHeaders?: () => Record<string, string>
 170|   private readonly getAuthHeaders: () => Record<string, string>
 171| 
 172|   // SSE connection state
 173|   private abortController: AbortController | null = null
 174|   private lastSequenceNum = 0
 175|   private seenSequenceNums = new Set<number>()
 176| 
 177|   // Reconnection state
 178|   private reconnectAttempts = 0
 179|   private reconnectStartTime: number | null = null
 180|   private reconnectTimer: NodeJS.Timeout | null = null
 181| 
 182|   // Liveness detection
 183|   private livenessTimer: NodeJS.Timeout | null = null
 184| 
 185|   // POST URL (derived from SSE URL)
 186|   private postUrl: string
 187| 
 188|   // Runtime epoch for CCR v2 event format
 189| 
 190|   constructor(
 191|     private readonly url: URL,
 192|     headers: Record<string, string> = {},
 193|     sessionId?: string,
 194|     refreshHeaders?: () => Record<string, string>,
 195|     initialSequenceNum?: number,
 196|     /**
 197|      * Per-instance auth header source. Omit to read the process-wide
 198|      * CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers). Required
 199|      * for concurrent multi-session callers — the env-var path is a process
 200|      * global and would stomp across sessions.
 201|      */
 202|     getAuthHeaders?: () => Record<string, string>,
 203|   ) {
 204|     this.headers = headers
 205|     this.sessionId = sessionId
 206|     this.refreshHeaders = refreshHeaders
 207|     this.getAuthHeaders = getAuthHeaders ?? getSessionIngressAuthHeaders
 208|     this.postUrl = convertSSEUrlToPostUrl(url)
 209|     // Seed with a caller-provided high-water mark so the first connect()
 210|     // sends from_sequence_num / Last-Event-ID. Without this, a fresh
 211|     // SSETransport always asks the server to replay from sequence 0 —
 212|     // the entire session history on every transport swap.
 213|     if (initialSequenceNum !== undefined && initialSequenceNum > 0) {
 214|       this.lastSequenceNum = initialSequenceNum
 215|     }
 216|     logForDebugging(`SSETransport: SSE URL = ${url.href}`)
 217|     logForDebugging(`SSETransport: POST URL = ${this.postUrl}`)
 218|     logForDiagnosticsNoPII('info', 'cli_sse_transport_initialized')
 219|   }

CCRClient:v2 协议客户端

CCRClient 在 SSETransport 之上实现 Claude Code Remote v2:

  • initialize():worker register、读 epoch、恢复 external_metadata → restoredWorkerState
  • CCRInitError 带 reason:no_auth_headers / missing_epoch / worker_register_failed
  • heartbeat 默认 20s(server TTL 60s);连续 401/403 阈值 MAX_CONSECUTIVE_AUTH_FAILURES
  • accumulateStreamEvents:text_delta 合并为 full-so-far snapshot,客户端 mid-stream 连接可见完整块文本
  • writeEvent / writeInternalEvent / readInternalEvents — transcript 与 session resume
  • WorkerStateUploader 上报 worker_status、external_metadata(PUT /worker 合并)
  • reportDelivery / reportState / reportMetadata 接 command lifecycle 与 sessionState listeners

RemoteIO 在 CLAUDE_CODE_USE_CCR_V2 时 register setInternalEventWriter/Reader,hydrateFromCCRv2InternalEvents 可重建对话。

源码引用: src/cli/transports/ccrClient.ts · 第 49–68 行(共 999 行)

  49| export type CCRInitFailReason =
  50|   | 'no_auth_headers'
  51|   | 'missing_epoch'
  52|   | 'worker_register_failed'
  53| 
  54| /** Thrown by initialize(); carries a typed reason for the diag classifier. */
  55| export class CCRInitError extends Error {
  56|   constructor(readonly reason: CCRInitFailReason) {
  57|     super(`CCRClient init failed: ${reason}`)
  58|   }
  59| }
  60| 
  61| /**
  62|  * Consecutive 401/403 with a VALID-LOOKING token before giving up. An
  63|  * expired JWT short-circuits this (exits immediately — deterministic,
  64|  * retry is futile). This threshold is for the uncertain case: token's
  65|  * exp is in the future but server says 401 (userauth down, KMS hiccup,
  66|  * clock skew). 10 × 20s heartbeat ≈ 200s to ride it out.
  67|  */
  68| const MAX_CONSECUTIVE_AUTH_FAILURES = 10

源码引用: src/cli/transports/ccrClient.ts · 第 141–180 行(共 999 行)

 141| export function accumulateStreamEvents(
 142|   buffer: SDKPartialAssistantMessage[],
 143|   state: StreamAccumulatorState,
 144| ): EventPayload[] {
 145|   const out: EventPayload[] = []
 146|   // chunks[] → snapshot already in `out` this flush. Keyed by the chunks
 147|   // array reference (stable per {messageId, index}) so subsequent deltas
 148|   // rewrite the same entry instead of emitting one event per delta.
 149|   const touched = new Map<string[], CoalescedStreamEvent>()
 150|   for (const msg of buffer) {
 151|     switch (msg.event.type) {
 152|       case 'message_start': {
 153|         const id = msg.event.message.id
 154|         const prevId = state.scopeToMessage.get(scopeKey(msg))
 155|         if (prevId) state.byMessage.delete(prevId)
 156|         state.scopeToMessage.set(scopeKey(msg), id)
 157|         state.byMessage.set(id, [])
 158|         out.push(msg)
 159|         break
 160|       }
 161|       case 'content_block_delta': {
 162|         if (msg.event.delta.type !== 'text_delta') {
 163|           out.push(msg)
 164|           break
 165|         }
 166|         const messageId = state.scopeToMessage.get(scopeKey(msg))
 167|         const blocks = messageId ? state.byMessage.get(messageId) : undefined
 168|         if (!blocks) {
 169|           // Delta without a preceding message_start (reconnect mid-stream,
 170|           // or message_start was in a prior buffer that got dropped). Pass
 171|           // through raw — can't produce a full-so-far snapshot without the
 172|           // prior chunks anyway.
 173|           out.push(msg)
 174|           break
 175|         }
 176|         const chunks = (blocks[msg.event.index] ??= [])
 177|         chunks.push(msg.event.delta.text)
 178|         const existing = touched.get(chunks)
 179|         if (existing) {
 180|           existing.event.delta.text = chunks.join('')

源码引用: src/cli/transports/ccrClient.ts · 第 262–280 行(共 999 行)

 262| export class CCRClient {
 263|   private workerEpoch = 0
 264|   private readonly heartbeatIntervalMs: number
 265|   private readonly heartbeatJitterFraction: number
 266|   private heartbeatTimer: NodeJS.Timeout | null = null
 267|   private heartbeatInFlight = false
 268|   private closed = false
 269|   private consecutiveAuthFailures = 0
 270|   private currentState: SessionState | null = null
 271|   private readonly sessionBaseUrl: string
 272|   private readonly sessionId: string
 273|   private readonly http = createAxiosInstance({ keepAlive: true })
 274| 
 275|   // stream_event delay buffer — accumulates content deltas for up to
 276|   // STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count
 277|   // and enables text_delta coalescing). Mirrors HybridTransport's pattern.
 278|   private streamEventBuffer: SDKPartialAssistantMessage[] = []
 279|   private streamEventTimer: ReturnType<typeof setTimeout> | null = null
 280|   // Full-so-far text accumulator. Persists across flushes so each emitted

SerialBatchEventUploader 与 WorkerStateUploader

SerialBatchEventUploader<T>(Hybrid + CCR 共用):

  • enqueue 可阻塞(backpressure)当 pending > maxQueueSize
  • takeBatch 受 maxBatchSize 与可选 maxBatchBytes 限制
  • RetryableError.retryAfterMs 覆盖指数退避(429 Retry-After)
  • flush() 在 turn 边界 / shutdown 等待队列清空
  • droppedBatchCount 检测 silent drop(maxConsecutiveFailures)

WorkerStateUploader 不同语义:PUT /worker 状态,至多 1 in-flight + 1 pending patch,coalescePatches 对 external_metadata 做 RFC 7396 浅合并。无 backpressure——天然 2 slot 有界。失败无限退避直到 close()。

二者分工:事件流 POST vs 状态快照 PUT。

源码引用: src/cli/transports/SerialBatchEventUploader.ts · 第 26–62 行(共 276 行)

  26| export class RetryableError extends Error {
  27|   constructor(
  28|     message: string,
  29|     readonly retryAfterMs?: number,
  30|   ) {
  31|     super(message)
  32|   }
  33| }
  34| 
  35| type SerialBatchEventUploaderConfig<T> = {
  36|   /** Max items per POST (1 = no batching) */
  37|   maxBatchSize: number
  38|   /**
  39|    * Max serialized bytes per POST. First item always goes in regardless of
  40|    * size; subsequent items only if cumulative JSON bytes stay under this.
  41|    * Undefined = no byte limit (count-only batching).
  42|    */
  43|   maxBatchBytes?: number
  44|   /** Max pending items before enqueue() blocks */
  45|   maxQueueSize: number
  46|   /** The actual HTTP call — caller controls payload format */
  47|   send: (batch: T[]) => Promise<void>
  48|   /** Base delay for exponential backoff (ms) */
  49|   baseDelayMs: number
  50|   /** Max delay cap (ms) */
  51|   maxDelayMs: number
  52|   /** Random jitter range added to retry delay (ms) */
  53|   jitterMs: number
  54|   /**
  55|    * After this many consecutive send() failures, drop the failing batch
  56|    * and move on to the next pending item with a fresh failure budget.
  57|    * Undefined = retry indefinitely (default).
  58|    */
  59|   maxConsecutiveFailures?: number
  60|   /** Called when a batch is dropped for hitting maxConsecutiveFailures. */
  61|   onBatchDropped?: (batchSize: number, failures: number) => void
  62| }

源码引用: src/cli/transports/SerialBatchEventUploader.ts · 第 156–193 行(共 276 行)

 156|   private async drain(): Promise<void> {
 157|     if (this.draining || this.closed) return
 158|     this.draining = true
 159|     let failures = 0
 160| 
 161|     try {
 162|       while (this.pending.length > 0 && !this.closed) {
 163|         const batch = this.takeBatch()
 164|         if (batch.length === 0) continue
 165| 
 166|         try {
 167|           await this.config.send(batch)
 168|           failures = 0
 169|         } catch (err) {
 170|           failures++
 171|           if (
 172|             this.config.maxConsecutiveFailures !== undefined &&
 173|             failures >= this.config.maxConsecutiveFailures
 174|           ) {
 175|             this.droppedBatches++
 176|             this.config.onBatchDropped?.(batch.length, failures)
 177|             failures = 0
 178|             this.releaseBackpressure()
 179|             continue
 180|           }
 181|           // Re-queue the failed batch at the front. Use concat (single
 182|           // allocation) instead of unshift(...batch) which shifts every
 183|           // pending item batch.length times. Only hit on failure path.
 184|           this.pending = batch.concat(this.pending)
 185|           const retryAfterMs =
 186|             err instanceof RetryableError ? err.retryAfterMs : undefined
 187|           await this.sleep(this.retryDelay(failures, retryAfterMs))
 188|           continue
 189|         }
 190| 
 191|         // Release backpressure waiters if space opened up
 192|         this.releaseBackpressure()
 193|       }

源码引用: src/cli/transports/WorkerStateUploader.ts · 第 3–47 行(共 132 行)

   3| /**
   4|  * Coalescing uploader for PUT /worker (session state + metadata).
   5|  *
   6|  * - 1 in-flight PUT + 1 pending patch
   7|  * - New calls coalesce into pending (never grows beyond 1 slot)
   8|  * - On success: send pending if exists
   9|  * - On failure: exponential backoff (clamped), retries indefinitely
  10|  *   until success or close(). Absorbs any pending patches before each retry.
  11|  * - No backpressure needed — naturally bounded at 2 slots
  12|  *
  13|  * Coalescing rules:
  14|  * - Top-level keys (worker_status, external_metadata) — last value wins
  15|  * - Inside external_metadata / internal_metadata — RFC 7396 merge:
  16|  *   keys are added/overwritten, null values preserved (server deletes)
  17|  */
  18| 
  19| type WorkerStateUploaderConfig = {
  20|   send: (body: Record<string, unknown>) => Promise<boolean>
  21|   /** Base delay for exponential backoff (ms) */
  22|   baseDelayMs: number
  23|   /** Max delay cap (ms) */
  24|   maxDelayMs: number
  25|   /** Random jitter range added to retry delay (ms) */
  26|   jitterMs: number
  27| }
  28| 
  29| export class WorkerStateUploader {
  30|   private inflight: Promise<void> | null = null
  31|   private pending: Record<string, unknown> | null = null
  32|   private closed = false
  33|   private readonly config: WorkerStateUploaderConfig
  34| 
  35|   constructor(config: WorkerStateUploaderConfig) {
  36|     this.config = config
  37|   }
  38| 
  39|   /**
  40|    * Enqueue a patch to PUT /worker. Coalesces with any existing pending
  41|    * patch. Fire-and-forget — callers don't need to await.
  42|    */
  43|   enqueue(patch: Record<string, unknown>): void {
  44|     if (this.closed) return
  45|     this.pending = this.pending ? coalescePatches(this.pending, patch) : patch
  46|     void this.drain()
  47|   }

RemoteIO 如何挂载 Transport

回顾 remoteIO.ts 构造顺序(与 transports 章交叉引用):

  1. PassThrough + super()
  2. getTransportForUrl → 具体 Transport 实例
  3. setOnData / setOnClose
  4. 若 CCR v2:new CCRClient 先于 connect()
  5. transport.connect()
  6. bridge keep_alive timer

write() 路径:ccrClient ? writeEvent : transport.write(message)。

调试连接问题时的 checklist:token Authorization header、SSE vs WS env、sequence num 续传、PERMANENT close code 是否触发 gracefulShutdown。

源码引用: src/cli/remoteIO.ts · 第 87–109 行(共 256 行)

  87|     // Get appropriate transport based on URL protocol
  88|     this.transport = getTransportForUrl(
  89|       this.url,
  90|       headers,
  91|       getSessionId(),
  92|       refreshHeaders,
  93|     )
  94| 
  95|     // Set up data callback
  96|     this.isBridge = process.env.CLAUDE_CODE_ENVIRONMENT_KIND === 'bridge'
  97|     this.isDebug = isDebugMode()
  98|     this.transport.setOnData((data: string) => {
  99|       this.inputStream.write(data)
 100|       if (this.isBridge && this.isDebug) {
 101|         writeToStdout(data.endsWith('\n') ? data : data + '\n')
 102|       }
 103|     })
 104| 
 105|     // Set up close callback to handle connection failures
 106|     this.transport.setOnClose(() => {
 107|       // End the input stream to trigger graceful shutdown
 108|       this.inputStream.end()
 109|     })

源码引用: src/cli/remoteIO.ts · 第 111–125 行(共 256 行)

 111|     // Initialize CCR v2 client (heartbeats, epoch, state reporting, event writes).
 112|     // The CCRClient constructor wires the SSE received-ack handler
 113|     // synchronously, so new CCRClient() MUST run before transport.connect() —
 114|     // otherwise early SSE frames hit an unwired onEventCallback and their
 115|     // 'received' delivery acks are silently dropped.
 116|     if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
 117|       // CCR v2 is SSE+POST by definition. getTransportForUrl returns
 118|       // SSETransport under the same env var, but the two checks live in
 119|       // different files — assert the invariant so a future decoupling
 120|       // fails loudly here instead of confusingly inside CCRClient.
 121|       if (!(this.transport instanceof SSETransport)) {
 122|         throw new Error(
 123|           'CCR v2 requires SSETransport; check getTransportForUrl',
 124|         )
 125|       }

telemetry 与诊断关键字

transport 层大量 logForDiagnosticsNoPII 事件,便于无 PII 聚合:

  • cli_websocket_connect_opening / cli_websocket_connect_failed
  • cli_sse_transport_initialized
  • cli_hybrid_batch_dropped_max_failures
  • cli_worker_lifecycle_init_failed

WebSocket isBridge 选项限制 tengu_ws_transport_* 只出现在 Remote Control bridge 会话,print-mode worker 保持静默。

读 oncall 问题时,对照 lastActivityTime 与 proxy idle timeout(Cloudflare 5min 等)是否匹配 keep_alive 间隔。

源码目录

关联:bridge/pollConfig.js(keepalive 间隔)、utils/sessionIngressAuth.js(Bearer token)、utils/proxy.js(WS 代理与 axios)。

动手练习

  1. 画一张表:env 变量 → Transport 类 → 读协议 / 写协议
  2. 阅读 HybridTransport postOnce 与 SSETransport write 的 axios 配置差异
  3. 追踪 accumulateStreamEvents 如何在 writeEvent 收到完整 assistant 时 clearStreamAccumulatorForMessage
  4. 对比 WebSocket messageBuffer replay 与 SSE Last-Event-ID 续传语义

本章小结与延伸

transports 是 RemoteIO 的物理层。下一章 handlers 回到纯本地子命令;update-util 章也涉及 WorkerStateUploader。 继续学习:

  • Structured IO
  • Update & Upload
Prev
Structured IO · NDJSON SDK 协议
Next
CLI Handlers · 子命令懒加载实现