本章总览
cli/transports/ 实现 Claude Code worker 与 Session Ingress / CCR v2 之间的双向通道:读侧 WebSocket 或 SSE,写侧 WS 或 HTTP POST。getTransportForUrl 按环境变量选型;HybridTransport 与 CCRClient 共用 SerialBatchEventUploader 做串行批量 POST;WorkerStateUploader 负责 PUT /worker 状态合并。本章要求你能解释为何 bridge 必须 serialize POST,以及 CCR v2 为何强制 SSETransport。
学完本章你应该能
- 列出 transportUtils 的三级选型优先级
- 说明 WebSocketTransport 重连、ping 与 messageBuffer 回放
- 解释 HybridTransport 的 stream_event 100ms 批与 fire-and-forget 写
- 描述 SSETransport 的 parseSSEFrames 与 Last-Event-ID 续传
- 理解 CCRClient 的 heartbeat、text_delta 合并与 internal events
核心概念(先读懂这些)
读写在 v2 架构上解耦
Session Ingress v1 默认 WebSocketTransport:读写同一条 WS。v2 引入 Hybrid(WS 读 + POST 写)与 CCR(SSE 读 + POST 写 + CCRClient 协议)。解耦是为绕开代理 idle timeout、Firestore 写冲突、以及 server 侧 event stream 语义(SSE client_event 帧)。
SerialBatchEventUploader 是共享原语
HybridTransport 与 CCRClient 都把 outbound 事件 enqueue 到 SerialBatchEventUploader:至多一个 POST in-flight,失败指数退避,可选 maxConsecutiveFailures 丢批。stream_event 常先进入 100ms delay buffer,减少 content delta POST 风暴;非 stream 写会先 flush buffer 保序。
永久错误 vs 可重试断开
WebSocket PERMANENT_CLOSE_CODES(1002, 4001, 4003)与 SSE PERMANENT_HTTP_CODES(401, 403, 404)立即转 closed,不重连。其他断开走指数退避,RECONNECT_GIVE_UP_MS 约 10 分钟预算。睡眠检测(gap > 60s)重置 reconnect budget,由 server 用 4001 告知 session reaped。
建议学习步骤
- 阅读源码块 A:Transport 接口与 getTransportForUrl
- 阅读源码块 B:WebSocketTransport 状态机与重连
- 阅读源码块 C:HybridTransport 写路径注释
- 阅读源码块 D:SSETransport parseSSEFrames
- 阅读源码块 E:CCRClient 初始化与 stream 合并
- 阅读源码块 F:SerialBatchEventUploader drain
常见误区
注意
CCR v2 必须在 connect 前 new CCRClient——RemoteIO 注释强调顺序
注意
HybridTransport bridge 用 void write(),backpressure 暂不生效;maxQueueSize 只是内存 bound
注意
SSE initialSequenceNum 缺失会导致全量 replay session history
Transport 选型总览
getTransportForUrl(url, headers, sessionId, refreshHeaders)
│
├─ CLAUDE_CODE_USE_CCR_V2 → SSETransport
│ url.pathname + '/worker/events/stream'
│ + CCRClient (RemoteIO 内)
│
├─ ws/wss + CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2 → HybridTransport
│
└─ ws/wss (default) → WebSocketTransport
refreshHeaders 回调让 transport 重连时重新读 session ingress token(父进程刷新 token 文件后 worker 可 pick up)。
仅 ws/wss 支持;http URL 抛 Unsupported protocol(CCR v2 会把 wss→https 再拼 SSE path)。
源码引用: src/cli/transports/Transport.ts · 第 1–7 行(共 8 行)
1| export interface Transport {
2| connect?(): Promise<void>
3| close?(): void | Promise<void>
4| send?(data: string): Promise<void>
5| onData?(handler: (data: string) => void): void
6| onClose?(handler: (closeCode?: number) => void): void
7| }
源码引用: src/cli/transports/transportUtils.ts · 第 8–45 行(共 46 行)
8| /**
9| * Helper function to get the appropriate transport for a URL.
10| *
11| * Transport selection priority:
12| * 1. SSETransport (SSE reads + POST writes) when CLAUDE_CODE_USE_CCR_V2 is set
13| * 2. HybridTransport (WS reads + POST writes) when CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2 is set
14| * 3. WebSocketTransport (WS reads + WS writes) — default
15| */
16| export function getTransportForUrl(
17| url: URL,
18| headers: Record<string, string> = {},
19| sessionId?: string,
20| refreshHeaders?: () => Record<string, string>,
21| ): Transport {
22| if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
23| // v2: SSE for reads, HTTP POST for writes
24| // --sdk-url is the session URL (.../sessions/{id});
25| // derive the SSE stream URL by appending /worker/events/stream
26| const sseUrl = new URL(url.href)
27| if (sseUrl.protocol === 'wss:') {
28| sseUrl.protocol = 'https:'
29| } else if (sseUrl.protocol === 'ws:') {
30| sseUrl.protocol = 'http:'
31| }
32| sseUrl.pathname =
33| sseUrl.pathname.replace(/\/$/, '') + '/worker/events/stream'
34| return new SSETransport(sseUrl, headers, sessionId, refreshHeaders)
35| }
36|
37| if (url.protocol === 'ws:' || url.protocol === 'wss:') {
38| if (isEnvTruthy(process.env.CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2)) {
39| return new HybridTransport(url, headers, sessionId, refreshHeaders)
40| }
41| return new WebSocketTransport(url, headers, sessionId, refreshHeaders)
42| } else {
43| throw new Error(`Unsupported protocol: ${url.protocol}`)
44| }
45| }
WebSocketTransport:连接、缓冲与重连
WebSocketTransport 是默认全双工实现:
- 状态机:idle → reconnecting → connected → closing → closed
- CircularBuffer<StdoutMessage>(默认 1000)在断线期间缓存 outbound,重连后 replay
- pingInterval + pongReceived 检测半开连接;keepAliveInterval 发 JSON keep_alive 数据帧重置代理 idle 计时(lastActivityTime 不含 ping/pong)
- PERMANENT_CLOSE_CODES 不重连;SLEEP_DETECTION_THRESHOLD_MS 检测系统睡眠
- 支持 Bun 原生 WS 与
ws包;refreshHeaders 在 reconnect 时合并
WebSocketTransportOptions:autoReconnect(bridge poll loop 可关)、isBridge(控制 tengu_ws_transport_* 遥测)。
RemoteIO 构造时 sessionId 传入,用于 analytics 与 activity callback。
源码引用: src/cli/transports/WebSocketTransport.ts · 第 42–58 行(共 801 行)
42| const PERMANENT_CLOSE_CODES = new Set([
43| 1002, // protocol error — server rejected handshake (e.g. session reaped)
44| 4001, // session expired / not found
45| 4003, // unauthorized
46| ])
47|
48| export type WebSocketTransportOptions = {
49| /** When false, the transport does not attempt automatic reconnection on
50| * disconnect. Use this when the caller has its own recovery mechanism
51| * (e.g. the REPL bridge poll loop). Defaults to true. */
52| autoReconnect?: boolean
53| /** Gates the tengu_ws_transport_* telemetry events. Set true at the
54| * REPL-bridge construction site so only Remote Control sessions (the
55| * Cloudflare-idle-timeout population) emit; print-mode workers stay
56| * silent. Defaults to false. */
57| isBridge?: boolean
58| }
源码引用: src/cli/transports/WebSocketTransport.ts · 第 74–133 行(共 801 行)
74| export class WebSocketTransport implements Transport {
75| private ws: WebSocketLike | null = null
76| private lastSentId: string | null = null
77| protected url: URL
78| protected state: WebSocketTransportState = 'idle'
79| protected onData?: (data: string) => void
80| private onCloseCallback?: (closeCode?: number) => void
81| private onConnectCallback?: () => void
82| private headers: Record<string, string>
83| private sessionId?: string
84| private autoReconnect: boolean
85| private isBridge: boolean
86|
87| // Reconnection state
88| private reconnectAttempts = 0
89| private reconnectStartTime: number | null = null
90| private reconnectTimer: NodeJS.Timeout | null = null
91| private lastReconnectAttemptTime: number | null = null
92| // Wall-clock of last WS data-frame activity (inbound message or outbound
93| // ws.send). Used to compute idle time at close — the signal for diagnosing
94| // proxy idle-timeout RSTs (e.g. Cloudflare 5-min). Excludes ping/pong
95| // control frames (proxies don't count those).
96| private lastActivityTime = 0
97|
98| // Ping interval for connection health checks
99| private pingInterval: NodeJS.Timeout | null = null
100| private pongReceived = true
101|
102| // Periodic keep_alive data frames to reset proxy idle timers
103| private keepAliveInterval: NodeJS.Timeout | null = null
104|
105| // Message buffering for replay on reconnection
106| private messageBuffer: CircularBuffer<StdoutMessage>
107| // Track which runtime's WS we're using so we can detach listeners
108| // with the matching API (removeEventListener vs. off).
109| private isBunWs = false
110|
111| // Captured at connect() time for handleOpenEvent timing. Stored as an
112| // instance field so the onOpen handler can be a stable class-property
113| // arrow function (removable in doDisconnect) instead of a closure over
114| // a local variable.
115| private connectStartTime = 0
116|
117| private refreshHeaders?: () => Record<string, string>
118|
119| constructor(
120| url: URL,
121| headers: Record<string, string> = {},
122| sessionId?: string,
123| refreshHeaders?: () => Record<string, string>,
124| options?: WebSocketTransportOptions,
125| ) {
126| this.url = url
127| this.headers = headers
128| this.sessionId = sessionId
129| this.refreshHeaders = refreshHeaders
130| this.autoReconnect = options?.autoReconnect ?? true
131| this.isBridge = options?.isBridge ?? false
132| this.messageBuffer = new CircularBuffer(DEFAULT_MAX_BUFFER_SIZE)
133| }
源码引用: src/cli/transports/WebSocketTransport.ts · 第 135–149 行(共 801 行)
135| public async connect(): Promise<void> {
136| if (this.state !== 'idle' && this.state !== 'reconnecting') {
137| logForDebugging(
138| `WebSocketTransport: Cannot connect, current state is ${this.state}`,
139| { level: 'error' },
140| )
141| logForDiagnosticsNoPII('error', 'cli_websocket_connect_failed')
142| return
143| }
144| this.state = 'reconnecting'
145|
146| this.connectStartTime = Date.now()
147| logForDebugging(`WebSocketTransport: Opening ${this.url.href}`)
148| logForDiagnosticsNoPII('info', 'cli_websocket_connect_opening')
149|
HybridTransport:WS 读 + POST 写
HybridTransport extends WebSocketTransport,写路径完全走 HTTP POST:
设计动机(文件头注释):
- bridge 用
void transport.write()fire-and-forget - 并发 POST → 同一 Firestore 文档并发写 → 碰撞 → retry storm
写流程:
- stream_event 进 streamEventBuffer,100ms timer 或遇 non-stream 写时 flush
- flush 后 SerialBatchEventUploader.enqueue
- uploader 串行 postOnce,失败无限重试(replBridge 可设 maxConsecutiveFailures)
postUrl 由 convertWsUrlToPostUrl 从 WS URL 推导。POST_TIMEOUT_MS=15s 防止单 POST 卡死队列。close() 有 CLOSE_GRACE_MS=3s 尽力 drain。
这与 CCR v2 的 POST 模式同族,但 Hybrid 仍用 WS 读侧。
源码引用: src/cli/transports/HybridTransport.ts · 第 24–53 行(共 283 行)
24| /**
25| * Hybrid transport: WebSocket for reads, HTTP POST for writes.
26| *
27| * Write flow:
28| *
29| * write(stream_event) ─┐
30| * │ (100ms timer)
31| * │
32| * ▼
33| * write(other) ────► uploader.enqueue() (SerialBatchEventUploader)
34| * ▲ │
35| * writeBatch() ────────┘ │ serial, batched, retries indefinitely,
36| * │ backpressure at maxQueueSize
37| * ▼
38| * postOnce() (single HTTP POST, throws on retryable)
39| *
40| * stream_event messages accumulate in streamEventBuffer for up to 100ms
41| * before enqueue (reduces POST count for high-volume content deltas). A
42| * non-stream write flushes any buffered stream_events first to preserve order.
43| *
44| * Serialization + retry + backpressure are delegated to SerialBatchEventUploader
45| * (same primitive CCR uses). At most one POST in-flight; events arriving during
46| * a POST batch into the next one. On failure, the uploader re-queues and retries
47| * with exponential backoff + jitter. If the queue fills past maxQueueSize,
48| * enqueue() blocks — giving awaiting callers backpressure.
49| *
50| * Why serialize? Bridge mode fires writes via `void transport.write()`
51| * (fire-and-forget). Without this, concurrent POSTs → concurrent Firestore
52| * writes to the same document → collisions → retry storms → pages oncall.
53| */
源码引用: src/cli/transports/HybridTransport.ts · 第 54–100 行(共 283 行)
54| export class HybridTransport extends WebSocketTransport {
55| private postUrl: string
56| private uploader: SerialBatchEventUploader<StdoutMessage>
57|
58| // stream_event delay buffer — accumulates content deltas for up to
59| // BATCH_FLUSH_INTERVAL_MS before enqueueing (reduces POST count)
60| private streamEventBuffer: StdoutMessage[] = []
61| private streamEventTimer: ReturnType<typeof setTimeout> | null = null
62|
63| constructor(
64| url: URL,
65| headers: Record<string, string> = {},
66| sessionId?: string,
67| refreshHeaders?: () => Record<string, string>,
68| options?: WebSocketTransportOptions & {
69| maxConsecutiveFailures?: number
70| onBatchDropped?: (batchSize: number, failures: number) => void
71| },
72| ) {
73| super(url, headers, sessionId, refreshHeaders, options)
74| const { maxConsecutiveFailures, onBatchDropped } = options ?? {}
75| this.postUrl = convertWsUrlToPostUrl(url)
76| this.uploader = new SerialBatchEventUploader<StdoutMessage>({
77| // Large cap — session-ingress accepts arbitrary batch sizes. Events
78| // naturally batch during in-flight POSTs; this just bounds the payload.
79| maxBatchSize: 500,
80| // Bridge callers use `void transport.write()` — backpressure doesn't
81| // apply (they don't await). A batch >maxQueueSize deadlocks (see
82| // SerialBatchEventUploader backpressure check). So set it high enough
83| // to be a memory bound only. Wire real backpressure in a follow-up
84| // once callers await.
85| maxQueueSize: 100_000,
86| baseDelayMs: 500,
87| maxDelayMs: 8000,
88| jitterMs: 1000,
89| // Optional cap so a persistently-failing server can't pin the drain
90| // loop for the lifetime of the process. Undefined = indefinite retry.
91| // replBridge sets this; the 1P transportUtils path does not.
92| maxConsecutiveFailures,
93| onBatchDropped: (batchSize, failures) => {
94| logForDiagnosticsNoPII(
95| 'error',
96| 'cli_hybrid_batch_dropped_max_failures',
97| {
98| batchSize,
99| failures,
100| },
SSETransport:SSE 帧解析与 client_event
SSETransport 用于 CCR v2 读路径:
- parseSSEFrames:按 \n\n 切 frame,解析 event/id/data 字段;comment 行(:keepalive)重置 liveness
- 只处理
event: client_event,payload 为 StreamClientEvent proto JSON - lastSequenceNum + seenSequenceNums 去重;getLastSequenceNum() 供 replBridge 换 transport 时 resume
- initialSequenceNum 构造参数避免从 sequence 0 全量 replay
- POST 写路径与 Hybrid 类似(axios + retry);liveness 45s 无数据则重连
- PERMANENT_HTTP_CODES 401/403/404 立即 closed
convertSSEUrlToPostUrl / convertWsUrlToPostUrl 对称推导 POST endpoint。
源码引用: src/cli/transports/SSETransport.ts · 第 58–116 行(共 712 行)
58| export function parseSSEFrames(buffer: string): {
59| frames: SSEFrame[]
60| remaining: string
61| } {
62| const frames: SSEFrame[] = []
63| let pos = 0
64|
65| // SSE frames are delimited by double newlines
66| let idx: number
67| while ((idx = buffer.indexOf('\n\n', pos)) !== -1) {
68| const rawFrame = buffer.slice(pos, idx)
69| pos = idx + 2
70|
71| // Skip empty frames
72| if (!rawFrame.trim()) continue
73|
74| const frame: SSEFrame = {}
75| let isComment = false
76|
77| for (const line of rawFrame.split('\n')) {
78| if (line.startsWith(':')) {
79| // SSE comment (e.g., `:keepalive`)
80| isComment = true
81| continue
82| }
83|
84| const colonIdx = line.indexOf(':')
85| if (colonIdx === -1) continue
86|
87| const field = line.slice(0, colonIdx)
88| // Per SSE spec, strip one leading space after colon if present
89| const value =
90| line[colonIdx + 1] === ' '
91| ? line.slice(colonIdx + 2)
92| : line.slice(colonIdx + 1)
93|
94| switch (field) {
95| case 'event':
96| frame.event = value
97| break
98| case 'id':
99| frame.id = value
100| break
101| case 'data':
102| // Per SSE spec, multiple data: lines are concatenated with \n
103| frame.data = frame.data ? frame.data + '\n' + value : value
104| break
105| // Ignore other fields (retry:, etc.)
106| }
107| }
108|
109| // Only emit frames that have data (or are pure comments which reset liveness)
110| if (frame.data || isComment) {
111| frames.push(frame)
112| }
113| }
114|
115| return { frames, remaining: buffer.slice(pos) }
116| }
源码引用: src/cli/transports/SSETransport.ts · 第 149–219 行(共 712 行)
149| /**
150| * Transport that uses SSE for reading and HTTP POST for writing.
151| *
152| * Reads events via Server-Sent Events from the CCR v2 event stream endpoint.
153| * Writes events via HTTP POST with retry logic (same pattern as HybridTransport).
154| *
155| * Each `event: client_event` frame carries a StreamClientEvent proto JSON
156| * directly in `data:`. The transport extracts `payload` and passes it to
157| * `onData` as newline-delimited JSON for StructuredIO consumers.
158| *
159| * Supports automatic reconnection with exponential backoff and Last-Event-ID
160| * for resumption after disconnection.
161| */
162| export class SSETransport implements Transport {
163| private state: SSETransportState = 'idle'
164| private onData?: (data: string) => void
165| private onCloseCallback?: (closeCode?: number) => void
166| private onEventCallback?: (event: StreamClientEvent) => void
167| private headers: Record<string, string>
168| private sessionId?: string
169| private refreshHeaders?: () => Record<string, string>
170| private readonly getAuthHeaders: () => Record<string, string>
171|
172| // SSE connection state
173| private abortController: AbortController | null = null
174| private lastSequenceNum = 0
175| private seenSequenceNums = new Set<number>()
176|
177| // Reconnection state
178| private reconnectAttempts = 0
179| private reconnectStartTime: number | null = null
180| private reconnectTimer: NodeJS.Timeout | null = null
181|
182| // Liveness detection
183| private livenessTimer: NodeJS.Timeout | null = null
184|
185| // POST URL (derived from SSE URL)
186| private postUrl: string
187|
188| // Runtime epoch for CCR v2 event format
189|
190| constructor(
191| private readonly url: URL,
192| headers: Record<string, string> = {},
193| sessionId?: string,
194| refreshHeaders?: () => Record<string, string>,
195| initialSequenceNum?: number,
196| /**
197| * Per-instance auth header source. Omit to read the process-wide
198| * CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers). Required
199| * for concurrent multi-session callers — the env-var path is a process
200| * global and would stomp across sessions.
201| */
202| getAuthHeaders?: () => Record<string, string>,
203| ) {
204| this.headers = headers
205| this.sessionId = sessionId
206| this.refreshHeaders = refreshHeaders
207| this.getAuthHeaders = getAuthHeaders ?? getSessionIngressAuthHeaders
208| this.postUrl = convertSSEUrlToPostUrl(url)
209| // Seed with a caller-provided high-water mark so the first connect()
210| // sends from_sequence_num / Last-Event-ID. Without this, a fresh
211| // SSETransport always asks the server to replay from sequence 0 —
212| // the entire session history on every transport swap.
213| if (initialSequenceNum !== undefined && initialSequenceNum > 0) {
214| this.lastSequenceNum = initialSequenceNum
215| }
216| logForDebugging(`SSETransport: SSE URL = ${url.href}`)
217| logForDebugging(`SSETransport: POST URL = ${this.postUrl}`)
218| logForDiagnosticsNoPII('info', 'cli_sse_transport_initialized')
219| }
CCRClient:v2 协议客户端
CCRClient 在 SSETransport 之上实现 Claude Code Remote v2:
- initialize():worker register、读 epoch、恢复 external_metadata → restoredWorkerState
- CCRInitError 带 reason:no_auth_headers / missing_epoch / worker_register_failed
- heartbeat 默认 20s(server TTL 60s);连续 401/403 阈值 MAX_CONSECUTIVE_AUTH_FAILURES
- accumulateStreamEvents:text_delta 合并为 full-so-far snapshot,客户端 mid-stream 连接可见完整块文本
- writeEvent / writeInternalEvent / readInternalEvents — transcript 与 session resume
- WorkerStateUploader 上报 worker_status、external_metadata(PUT /worker 合并)
- reportDelivery / reportState / reportMetadata 接 command lifecycle 与 sessionState listeners
RemoteIO 在 CLAUDE_CODE_USE_CCR_V2 时 register setInternalEventWriter/Reader,hydrateFromCCRv2InternalEvents 可重建对话。
源码引用: src/cli/transports/ccrClient.ts · 第 49–68 行(共 999 行)
49| export type CCRInitFailReason =
50| | 'no_auth_headers'
51| | 'missing_epoch'
52| | 'worker_register_failed'
53|
54| /** Thrown by initialize(); carries a typed reason for the diag classifier. */
55| export class CCRInitError extends Error {
56| constructor(readonly reason: CCRInitFailReason) {
57| super(`CCRClient init failed: ${reason}`)
58| }
59| }
60|
61| /**
62| * Consecutive 401/403 with a VALID-LOOKING token before giving up. An
63| * expired JWT short-circuits this (exits immediately — deterministic,
64| * retry is futile). This threshold is for the uncertain case: token's
65| * exp is in the future but server says 401 (userauth down, KMS hiccup,
66| * clock skew). 10 × 20s heartbeat ≈ 200s to ride it out.
67| */
68| const MAX_CONSECUTIVE_AUTH_FAILURES = 10
源码引用: src/cli/transports/ccrClient.ts · 第 141–180 行(共 999 行)
141| export function accumulateStreamEvents(
142| buffer: SDKPartialAssistantMessage[],
143| state: StreamAccumulatorState,
144| ): EventPayload[] {
145| const out: EventPayload[] = []
146| // chunks[] → snapshot already in `out` this flush. Keyed by the chunks
147| // array reference (stable per {messageId, index}) so subsequent deltas
148| // rewrite the same entry instead of emitting one event per delta.
149| const touched = new Map<string[], CoalescedStreamEvent>()
150| for (const msg of buffer) {
151| switch (msg.event.type) {
152| case 'message_start': {
153| const id = msg.event.message.id
154| const prevId = state.scopeToMessage.get(scopeKey(msg))
155| if (prevId) state.byMessage.delete(prevId)
156| state.scopeToMessage.set(scopeKey(msg), id)
157| state.byMessage.set(id, [])
158| out.push(msg)
159| break
160| }
161| case 'content_block_delta': {
162| if (msg.event.delta.type !== 'text_delta') {
163| out.push(msg)
164| break
165| }
166| const messageId = state.scopeToMessage.get(scopeKey(msg))
167| const blocks = messageId ? state.byMessage.get(messageId) : undefined
168| if (!blocks) {
169| // Delta without a preceding message_start (reconnect mid-stream,
170| // or message_start was in a prior buffer that got dropped). Pass
171| // through raw — can't produce a full-so-far snapshot without the
172| // prior chunks anyway.
173| out.push(msg)
174| break
175| }
176| const chunks = (blocks[msg.event.index] ??= [])
177| chunks.push(msg.event.delta.text)
178| const existing = touched.get(chunks)
179| if (existing) {
180| existing.event.delta.text = chunks.join('')
源码引用: src/cli/transports/ccrClient.ts · 第 262–280 行(共 999 行)
262| export class CCRClient {
263| private workerEpoch = 0
264| private readonly heartbeatIntervalMs: number
265| private readonly heartbeatJitterFraction: number
266| private heartbeatTimer: NodeJS.Timeout | null = null
267| private heartbeatInFlight = false
268| private closed = false
269| private consecutiveAuthFailures = 0
270| private currentState: SessionState | null = null
271| private readonly sessionBaseUrl: string
272| private readonly sessionId: string
273| private readonly http = createAxiosInstance({ keepAlive: true })
274|
275| // stream_event delay buffer — accumulates content deltas for up to
276| // STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count
277| // and enables text_delta coalescing). Mirrors HybridTransport's pattern.
278| private streamEventBuffer: SDKPartialAssistantMessage[] = []
279| private streamEventTimer: ReturnType<typeof setTimeout> | null = null
280| // Full-so-far text accumulator. Persists across flushes so each emitted
SerialBatchEventUploader 与 WorkerStateUploader
SerialBatchEventUploader<T>(Hybrid + CCR 共用):
- enqueue 可阻塞(backpressure)当 pending > maxQueueSize
- takeBatch 受 maxBatchSize 与可选 maxBatchBytes 限制
- RetryableError.retryAfterMs 覆盖指数退避(429 Retry-After)
- flush() 在 turn 边界 / shutdown 等待队列清空
- droppedBatchCount 检测 silent drop(maxConsecutiveFailures)
WorkerStateUploader 不同语义:PUT /worker 状态,至多 1 in-flight + 1 pending patch,coalescePatches 对 external_metadata 做 RFC 7396 浅合并。无 backpressure——天然 2 slot 有界。失败无限退避直到 close()。
二者分工:事件流 POST vs 状态快照 PUT。
源码引用: src/cli/transports/SerialBatchEventUploader.ts · 第 26–62 行(共 276 行)
26| export class RetryableError extends Error {
27| constructor(
28| message: string,
29| readonly retryAfterMs?: number,
30| ) {
31| super(message)
32| }
33| }
34|
35| type SerialBatchEventUploaderConfig<T> = {
36| /** Max items per POST (1 = no batching) */
37| maxBatchSize: number
38| /**
39| * Max serialized bytes per POST. First item always goes in regardless of
40| * size; subsequent items only if cumulative JSON bytes stay under this.
41| * Undefined = no byte limit (count-only batching).
42| */
43| maxBatchBytes?: number
44| /** Max pending items before enqueue() blocks */
45| maxQueueSize: number
46| /** The actual HTTP call — caller controls payload format */
47| send: (batch: T[]) => Promise<void>
48| /** Base delay for exponential backoff (ms) */
49| baseDelayMs: number
50| /** Max delay cap (ms) */
51| maxDelayMs: number
52| /** Random jitter range added to retry delay (ms) */
53| jitterMs: number
54| /**
55| * After this many consecutive send() failures, drop the failing batch
56| * and move on to the next pending item with a fresh failure budget.
57| * Undefined = retry indefinitely (default).
58| */
59| maxConsecutiveFailures?: number
60| /** Called when a batch is dropped for hitting maxConsecutiveFailures. */
61| onBatchDropped?: (batchSize: number, failures: number) => void
62| }
源码引用: src/cli/transports/SerialBatchEventUploader.ts · 第 156–193 行(共 276 行)
156| private async drain(): Promise<void> {
157| if (this.draining || this.closed) return
158| this.draining = true
159| let failures = 0
160|
161| try {
162| while (this.pending.length > 0 && !this.closed) {
163| const batch = this.takeBatch()
164| if (batch.length === 0) continue
165|
166| try {
167| await this.config.send(batch)
168| failures = 0
169| } catch (err) {
170| failures++
171| if (
172| this.config.maxConsecutiveFailures !== undefined &&
173| failures >= this.config.maxConsecutiveFailures
174| ) {
175| this.droppedBatches++
176| this.config.onBatchDropped?.(batch.length, failures)
177| failures = 0
178| this.releaseBackpressure()
179| continue
180| }
181| // Re-queue the failed batch at the front. Use concat (single
182| // allocation) instead of unshift(...batch) which shifts every
183| // pending item batch.length times. Only hit on failure path.
184| this.pending = batch.concat(this.pending)
185| const retryAfterMs =
186| err instanceof RetryableError ? err.retryAfterMs : undefined
187| await this.sleep(this.retryDelay(failures, retryAfterMs))
188| continue
189| }
190|
191| // Release backpressure waiters if space opened up
192| this.releaseBackpressure()
193| }
源码引用: src/cli/transports/WorkerStateUploader.ts · 第 3–47 行(共 132 行)
3| /**
4| * Coalescing uploader for PUT /worker (session state + metadata).
5| *
6| * - 1 in-flight PUT + 1 pending patch
7| * - New calls coalesce into pending (never grows beyond 1 slot)
8| * - On success: send pending if exists
9| * - On failure: exponential backoff (clamped), retries indefinitely
10| * until success or close(). Absorbs any pending patches before each retry.
11| * - No backpressure needed — naturally bounded at 2 slots
12| *
13| * Coalescing rules:
14| * - Top-level keys (worker_status, external_metadata) — last value wins
15| * - Inside external_metadata / internal_metadata — RFC 7396 merge:
16| * keys are added/overwritten, null values preserved (server deletes)
17| */
18|
19| type WorkerStateUploaderConfig = {
20| send: (body: Record<string, unknown>) => Promise<boolean>
21| /** Base delay for exponential backoff (ms) */
22| baseDelayMs: number
23| /** Max delay cap (ms) */
24| maxDelayMs: number
25| /** Random jitter range added to retry delay (ms) */
26| jitterMs: number
27| }
28|
29| export class WorkerStateUploader {
30| private inflight: Promise<void> | null = null
31| private pending: Record<string, unknown> | null = null
32| private closed = false
33| private readonly config: WorkerStateUploaderConfig
34|
35| constructor(config: WorkerStateUploaderConfig) {
36| this.config = config
37| }
38|
39| /**
40| * Enqueue a patch to PUT /worker. Coalesces with any existing pending
41| * patch. Fire-and-forget — callers don't need to await.
42| */
43| enqueue(patch: Record<string, unknown>): void {
44| if (this.closed) return
45| this.pending = this.pending ? coalescePatches(this.pending, patch) : patch
46| void this.drain()
47| }
RemoteIO 如何挂载 Transport
回顾 remoteIO.ts 构造顺序(与 transports 章交叉引用):
- PassThrough + super()
- getTransportForUrl → 具体 Transport 实例
- setOnData / setOnClose
- 若 CCR v2:new CCRClient 先于 connect()
- transport.connect()
- bridge keep_alive timer
write() 路径:ccrClient ? writeEvent : transport.write(message)。
调试连接问题时的 checklist:token Authorization header、SSE vs WS env、sequence num 续传、PERMANENT close code 是否触发 gracefulShutdown。
源码引用: src/cli/remoteIO.ts · 第 87–109 行(共 256 行)
87| // Get appropriate transport based on URL protocol
88| this.transport = getTransportForUrl(
89| this.url,
90| headers,
91| getSessionId(),
92| refreshHeaders,
93| )
94|
95| // Set up data callback
96| this.isBridge = process.env.CLAUDE_CODE_ENVIRONMENT_KIND === 'bridge'
97| this.isDebug = isDebugMode()
98| this.transport.setOnData((data: string) => {
99| this.inputStream.write(data)
100| if (this.isBridge && this.isDebug) {
101| writeToStdout(data.endsWith('\n') ? data : data + '\n')
102| }
103| })
104|
105| // Set up close callback to handle connection failures
106| this.transport.setOnClose(() => {
107| // End the input stream to trigger graceful shutdown
108| this.inputStream.end()
109| })
源码引用: src/cli/remoteIO.ts · 第 111–125 行(共 256 行)
111| // Initialize CCR v2 client (heartbeats, epoch, state reporting, event writes).
112| // The CCRClient constructor wires the SSE received-ack handler
113| // synchronously, so new CCRClient() MUST run before transport.connect() —
114| // otherwise early SSE frames hit an unwired onEventCallback and their
115| // 'received' delivery acks are silently dropped.
116| if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
117| // CCR v2 is SSE+POST by definition. getTransportForUrl returns
118| // SSETransport under the same env var, but the two checks live in
119| // different files — assert the invariant so a future decoupling
120| // fails loudly here instead of confusingly inside CCRClient.
121| if (!(this.transport instanceof SSETransport)) {
122| throw new Error(
123| 'CCR v2 requires SSETransport; check getTransportForUrl',
124| )
125| }
telemetry 与诊断关键字
transport 层大量 logForDiagnosticsNoPII 事件,便于无 PII 聚合:
- cli_websocket_connect_opening / cli_websocket_connect_failed
- cli_sse_transport_initialized
- cli_hybrid_batch_dropped_max_failures
- cli_worker_lifecycle_init_failed
WebSocket isBridge 选项限制 tengu_ws_transport_* 只出现在 Remote Control bridge 会话,print-mode worker 保持静默。
读 oncall 问题时,对照 lastActivityTime 与 proxy idle timeout(Cloudflare 5min 等)是否匹配 keep_alive 间隔。
源码目录
关联:bridge/pollConfig.js(keepalive 间隔)、utils/sessionIngressAuth.js(Bearer token)、utils/proxy.js(WS 代理与 axios)。
动手练习
- 画一张表:env 变量 → Transport 类 → 读协议 / 写协议
- 阅读 HybridTransport postOnce 与 SSETransport write 的 axios 配置差异
- 追踪 accumulateStreamEvents 如何在 writeEvent 收到完整 assistant 时 clearStreamAccumulatorForMessage
- 对比 WebSocket messageBuffer replay 与 SSE Last-Event-ID 续传语义
本章小结与延伸
transports 是 RemoteIO 的物理层。下一章 handlers 回到纯本地子命令;update-util 章也涉及 WorkerStateUploader。 继续学习: