本章总览
StructuredIO 是 Claude Code SDK 模式的「stdio 协议栈」:把 stdin 上的 NDJSON 行解析为 StdinMessage | SDKMessage,把 StdoutMessage 序列化写回 stdout;并通过 control_request / control_response 实现权限、Hook 回调、MCP 消息与 elicitation 的双向 RPC。RemoteIO 在其上叠加 Session Ingress 传输;print.ts 的 runHeadless 是 REPL 之外的 query 主编排;ndjsonSafeStringify 保证跨语言行分割安全;exit.ts 统一子命令退出。本章要求你能从 SDK host 收到的一条 can_use_tool 追到 createCanUseTool 内的 Promise.race。
学完本章你应该能
- 解释 StructuredIO.read() 的行缓冲与 prependUserMessage 语义
- 说明 outbound Stream 如何防止 control_request 插队 stream_event
- 对比 StructuredIO.createCanUseTool 与 print.getCanUseToolFn 的分支
- 理解 RemoteIO 如何 wiring CCR v2 与 keep_alive
- 知道 ndjsonSafeStringify 为何要转义 U+2028/U+2029
核心概念(先读懂这些)
NDJSON 是一行一个 JSON 对象
SDK 协议不是 WebSocket 帧,而是 newline-delimited JSON。StructuredIO 在 read() 里维护 content 缓冲区,按 \n 切行,每行 jsonParse 后 normalizeControlMessageKeys。空行、keep_alive、update_environment_variables 被吞掉或就地处理(后者直接写 process.env,供 bridge token 刷新)。解析失败则 console.error + process.exit(1)——headless 模式宁可硬退出也不 silently corrupt transcript。
control_request 是跨进程的 Promise RPC
sendRequest() 生成 request_id,把 control_request enqueue 到 outbound Stream,并在 pendingRequests Map 里挂 Promise。stdin 收到匹配的 control_response 时 resolve/reject。AbortSignal 触发时先发 control_cancel_request 并 reject AbortError。resolvedToolUseIds Set 防止 WebSocket 重连后 duplicate control_response 把重复 assistant 推入 mutableMessages(API 400 tool_use ids must be unique)。
RemoteIO = StructuredIO + Transport
RemoteIO 用 PassThrough 作为 super(input) 的 AsyncIterable,transport.setOnData 把远端 NDJSON 写入该流。write() 经 CCRClient.writeEvent 或 transport.write 发回。CCR v2 必须在 transport.connect() 前 new CCRClient(),否则 early SSE ack 丢失。Bridge 模式可选 keep_alive 定时帧,防止 Envoy idle timeout。
建议学习步骤
- 阅读源码块 A:StructuredIO 类与 outbound Stream
- 阅读源码块 B:read() 行切分与 control_response 去重
- 阅读源码块 C:createCanUseTool 的 hook vs SDK race
- 阅读源码块 D:RemoteIO 构造与 CCR wiring
- 阅读源码块 E:ndjsonSafeStringify 与 exit.ts
- 阅读源码块 F:print.runHeadless 入口与 getCanUseToolFn
常见误区
注意
不要把 utils/process.writeToStdout 与 Ink 渲染混淆——StructuredIO 走纯 NDJSON
注意
injectControlResponse 会发 control_cancel_request,bridge 赢 race 时必须取消 SDK 侧挂起
注意
print.ts 体量极大,改 headless 行为先搜 runHeadless / getCanUseToolFn
在架构中的位置
SDK host / pipe / bridge parent
↔ stdin/stdout NDJSON OR RemoteIO(transport)
↓
StructuredIO.structuredInput (async generator)
↓
print.runHeadless 主循环
→ query.ts + getCanUseToolFn(structuredIO)
→ StructuredIO.write(stream_event | assistant | ...)
entrypoints/cli.tsx 在 --print / SDK 路径调用 runHeadless;有 --sdk-url 时构造 RemoteIO 而非纯 StructuredIO。TUI REPL 不走 StructuredIO 写 stdout 的主路径,但 bridge worker 与 VS Code 扩展消费同一消息类型(见 entrypoints/sdk/controlTypes)。
StructuredIO 核心:outbound 与 pendingRequests
StructuredIO 构造函数接收 AsyncIterable<string>(通常是 stdin chunks)和可选 replayUserMessages。
关键字段:
- outbound: Stream<StdoutMessage> — sendRequest 与 print 主循环共用;单一 drain writer,保证 stream_event 不被 control_request 插队
- pendingRequests — request_id → { resolve, reject, schema, request }
- resolvedToolUseIds — 上限 MAX_RESOLVED_TOOL_USE_IDS=1000,FIFO 驱逐
- restoredWorkerState — RemoteIO/CCR 在 worker 启动时恢复 external_metadata
prependUserMessage 把合成 user 行压入 prependedLines,read() 在每块 input 处理前优先消费——用于 mid-stream 注入用户 turn。
阅读 sendRequest 时注意:can_use_tool 发出时会触发 onControlRequestSent(bridge 转发到 claude.ai)。
源码引用: src/cli/structuredIO.ts · 第 130–170 行(共 860 行)
130| // Maximum number of resolved tool_use IDs to track. Once exceeded, the oldest
131| // entry is evicted. This bounds memory in very long sessions while keeping
132| // enough history to catch duplicate control_response deliveries.
133| const MAX_RESOLVED_TOOL_USE_IDS = 1000
134|
135| export class StructuredIO {
136| readonly structuredInput: AsyncGenerator<StdinMessage | SDKMessage>
137| private readonly pendingRequests = new Map<string, PendingRequest<unknown>>()
138|
139| // CCR external_metadata read back on worker start; null when the
140| // transport doesn't restore. Assigned by RemoteIO.
141| restoredWorkerState: Promise<SessionExternalMetadata | null> =
142| Promise.resolve(null)
143|
144| private inputClosed = false
145| private unexpectedResponseCallback?: (
146| response: SDKControlResponse,
147| ) => Promise<void>
148|
149| // Tracks tool_use IDs that have been resolved through the normal permission
150| // flow (or aborted by a hook). When a duplicate control_response arrives
151| // after the original was already handled, this Set prevents the orphan
152| // handler from re-processing it — which would push duplicate assistant
153| // messages into mutableMessages and cause a 400 "tool_use ids must be unique"
154| // error from the API.
155| private readonly resolvedToolUseIds = new Set<string>()
156| private prependedLines: string[] = []
157| private onControlRequestSent?: (request: SDKControlRequest) => void
158| private onControlRequestResolved?: (requestId: string) => void
159|
160| // sendRequest() and print.ts both enqueue here; the drain loop is the
161| // only writer. Prevents control_request from overtaking queued stream_events.
162| readonly outbound = new Stream<StdoutMessage>()
163|
164| constructor(
165| private readonly input: AsyncIterable<string>,
166| private readonly replayUserMessages?: boolean,
167| ) {
168| this.input = input
169| this.structuredInput = this.read()
170| }
源码引用: src/cli/structuredIO.ts · 第 469–531 行(共 860 行)
469| private async sendRequest<Response>(
470| request: SDKControlRequest['request'],
471| schema: z.Schema,
472| signal?: AbortSignal,
473| requestId: string = randomUUID(),
474| ): Promise<Response> {
475| const message: SDKControlRequest = {
476| type: 'control_request',
477| request_id: requestId,
478| request,
479| }
480| if (this.inputClosed) {
481| throw new Error('Stream closed')
482| }
483| if (signal?.aborted) {
484| throw new Error('Request aborted')
485| }
486| this.outbound.enqueue(message)
487| if (request.subtype === 'can_use_tool' && this.onControlRequestSent) {
488| this.onControlRequestSent(message)
489| }
490| const aborted = () => {
491| this.outbound.enqueue({
492| type: 'control_cancel_request',
493| request_id: requestId,
494| })
495| // Immediately reject the outstanding promise, without
496| // waiting for the host to acknowledge the cancellation.
497| const request = this.pendingRequests.get(requestId)
498| if (request) {
499| // Track the tool_use ID as resolved before rejecting, so that a
500| // late response from the host is ignored by the orphan handler.
501| this.trackResolvedToolUseId(request.request)
502| request.reject(new AbortError())
503| }
504| }
505| if (signal) {
506| signal.addEventListener('abort', aborted, {
507| once: true,
508| })
509| }
510| try {
511| return await new Promise<Response>((resolve, reject) => {
512| this.pendingRequests.set(requestId, {
513| request: {
514| type: 'control_request',
515| request_id: requestId,
516| request,
517| },
518| resolve: result => {
519| resolve(result as Response)
520| },
521| reject,
522| schema,
523| })
524| })
525| } finally {
526| if (signal) {
527| signal.removeEventListener('abort', aborted)
528| }
529| this.pendingRequests.delete(requestId)
530| }
531| }
read():行解析与 control_response 孤儿处理
read() 是 private async *generator,绑定 splitAndProcess 闭包:
- 合并 prependedLines 到 content
- 找第一个 \n,切出一行,processLine
- for-await input block 重复 2
- 流结束:reject 所有 pendingRequests("Tool permission stream closed")
processLine 分支要点:
- control_response:先 notifyCommandLifecycle(uuid, completed);查 pendingRequests;若无且 toolUseID 已在 resolvedToolUseIds → 忽略 duplicate
- control_request:必须有 request 字段,否则 exitWithMessage
- user:role 必须是 user
- assistant/system:原样 yield(供 replay)
update_environment_variables 直接 mutate process.env——bridge session runner 刷新 CLAUDE_CODE_SESSION_ACCESS_TOKEN 时 REPL 进程本身可读新 token。
源码引用: src/cli/structuredIO.ts · 第 215–261 行(共 860 行)
215| private async *read() {
216| let content = ''
217|
218| // Called once before for-await (an empty this.input otherwise skips the
219| // loop body entirely), then again per block. prependedLines re-check is
220| // inside the while so a prepend pushed between two messages in the SAME
221| // block still lands first.
222| const splitAndProcess = async function* (this: StructuredIO) {
223| for (;;) {
224| if (this.prependedLines.length > 0) {
225| content = this.prependedLines.join('') + content
226| this.prependedLines = []
227| }
228| const newline = content.indexOf('\n')
229| if (newline === -1) break
230| const line = content.slice(0, newline)
231| content = content.slice(newline + 1)
232| const message = await this.processLine(line)
233| if (message) {
234| logForDiagnosticsNoPII('info', 'cli_stdin_message_parsed', {
235| type: message.type,
236| })
237| yield message
238| }
239| }
240| }.bind(this)
241|
242| yield* splitAndProcess()
243|
244| for await (const block of this.input) {
245| content += block
246| yield* splitAndProcess()
247| }
248| if (content) {
249| const message = await this.processLine(content)
250| if (message) {
251| yield message
252| }
253| }
254| this.inputClosed = true
255| for (const request of this.pendingRequests.values()) {
256| // Reject all pending requests if the input stream
257| request.reject(
258| new Error('Tool permission stream closed before response received'),
259| )
260| }
261| }
源码引用: src/cli/structuredIO.ts · 第 362–430 行(共 860 行)
362| if (message.type === 'control_response') {
363| // Close lifecycle for every control_response, including duplicates
364| // and orphans — orphans don't yield to print.ts's main loop, so this
365| // is the only path that sees them. uuid is server-injected into the
366| // payload.
367| const uuid =
368| 'uuid' in message && typeof message.uuid === 'string'
369| ? message.uuid
370| : undefined
371| if (uuid) {
372| notifyCommandLifecycle(uuid, 'completed')
373| }
374| const request = this.pendingRequests.get(message.response.request_id)
375| if (!request) {
376| // Check if this tool_use was already resolved through the normal
377| // permission flow. Duplicate control_response deliveries (e.g. from
378| // WebSocket reconnects) arrive after the original was handled, and
379| // re-processing them would push duplicate assistant messages into
380| // the conversation, causing API 400 errors.
381| const responsePayload =
382| message.response.subtype === 'success'
383| ? message.response.response
384| : undefined
385| const toolUseID = responsePayload?.toolUseID
386| if (
387| typeof toolUseID === 'string' &&
388| this.resolvedToolUseIds.has(toolUseID)
389| ) {
390| logForDebugging(
391| `Ignoring duplicate control_response for already-resolved toolUseID=${toolUseID} request_id=${message.response.request_id}`,
392| )
393| return undefined
394| }
395| if (this.unexpectedResponseCallback) {
396| await this.unexpectedResponseCallback(message)
397| }
398| return undefined // Ignore responses for requests we don't know about
399| }
400| this.trackResolvedToolUseId(request.request)
401| this.pendingRequests.delete(message.response.request_id)
402| // Notify the bridge when the SDK consumer resolves a can_use_tool
403| // request, so it can cancel the stale permission prompt on claude.ai.
404| if (
405| request.request.request.subtype === 'can_use_tool' &&
406| this.onControlRequestResolved
407| ) {
408| this.onControlRequestResolved(message.response.request_id)
409| }
410|
411| if (message.response.subtype === 'error') {
412| request.reject(new Error(message.response.error))
413| return undefined
414| }
415| const result = message.response.response
416| if (request.schema) {
417| try {
418| request.resolve(request.schema.parse(result))
419| } catch (error) {
420| request.reject(error)
421| }
422| } else {
423| request.resolve({})
424| }
425| // Propagate control responses when replay is enabled
426| if (this.replayUserMessages) {
427| return message
428| }
429| return undefined
430| }
createCanUseTool:Hook 与 SDK 权限 race
当 hasPermissionsToUseTool 返回 behavior === ask 时,StructuredIO 不走 Ink PermissionRequest,而是:
- 并行启动 executePermissionRequestHooksForSDK(PermissionRequest shell hooks)
- 立即 sendRequest can_use_tool(带 permission_suggestions、blocked_path、decision_reason 序列化)
- Promise.race([hookPromise, sdkPromise])
Hook 先决 → abort SDK 请求;Hook pass-through → await SDK;SDK 先决 → 忽略仍在跑的 hook。
SANDBOX_NETWORK_ACCESS_TOOL_NAME 复用 can_use_tool 协议做沙箱网络授权,避免新 subtype。
createHookCallback / handleElicitation / sendMcpMessage 都是同一 sendRequest 原语的不同 schema。
源码引用: src/cli/structuredIO.ts · 第 533–638 行(共 860 行)
533| createCanUseTool(
534| onPermissionPrompt?: (details: RequiresActionDetails) => void,
535| ): CanUseToolFn {
536| return async (
537| tool: Tool,
538| input: { [key: string]: unknown },
539| toolUseContext: ToolUseContext,
540| assistantMessage: AssistantMessage,
541| toolUseID: string,
542| forceDecision?: PermissionDecision,
543| ): Promise<PermissionDecision> => {
544| const mainPermissionResult =
545| forceDecision ??
546| (await hasPermissionsToUseTool(
547| tool,
548| input,
549| toolUseContext,
550| assistantMessage,
551| toolUseID,
552| ))
553| // If the tool is allowed or denied, return the result
554| if (
555| mainPermissionResult.behavior === 'allow' ||
556| mainPermissionResult.behavior === 'deny'
557| ) {
558| return mainPermissionResult
559| }
560|
561| // Run PermissionRequest hooks in parallel with the SDK permission
562| // prompt. In the terminal CLI, hooks race against the interactive
563| // prompt so that e.g. a hook with --delay 20 doesn't block the UI.
564| // We need the same behavior here: the SDK host (VS Code, etc.) shows
565| // its permission dialog immediately while hooks run in the background.
566| // Whichever resolves first wins; the loser is cancelled/ignored.
567|
568| // AbortController used to cancel the SDK request if a hook decides first
569| const hookAbortController = new AbortController()
570| const parentSignal = toolUseContext.abortController.signal
571| // Forward parent abort to our local controller
572| const onParentAbort = () => hookAbortController.abort()
573| parentSignal.addEventListener('abort', onParentAbort, { once: true })
574|
575| try {
576| // Start the hook evaluation (runs in background)
577| const hookPromise = executePermissionRequestHooksForSDK(
578| tool.name,
579| toolUseID,
580| input,
581| toolUseContext,
582| mainPermissionResult.suggestions,
583| ).then(decision => ({ source: 'hook' as const, decision }))
584|
585| // Start the SDK permission prompt immediately (don't wait for hooks)
586| const requestId = randomUUID()
587| onPermissionPrompt?.(
588| buildRequiresActionDetails(tool, input, toolUseID, requestId),
589| )
590| const sdkPromise = this.sendRequest<PermissionToolOutput>(
591| {
592| subtype: 'can_use_tool',
593| tool_name: tool.name,
594| input,
595| permission_suggestions: mainPermissionResult.suggestions,
596| blocked_path: mainPermissionResult.blockedPath,
597| decision_reason: serializeDecisionReason(
598| mainPermissionResult.decisionReason,
599| ),
600| tool_use_id: toolUseID,
601| agent_id: toolUseContext.agentId,
602| },
603| permissionToolOutputSchema(),
604| hookAbortController.signal,
605| requestId,
606| ).then(result => ({ source: 'sdk' as const, result }))
607|
608| // Race: hook completion vs SDK prompt response.
609| // The hook promise always resolves (never rejects), returning
610| // undefined if no hook made a decision.
611| const winner = await Promise.race([hookPromise, sdkPromise])
612|
613| if (winner.source === 'hook') {
614| if (winner.decision) {
615| // Hook decided — abort the pending SDK request.
616| // Suppress the expected AbortError rejection from sdkPromise.
617| sdkPromise.catch(() => {})
618| hookAbortController.abort()
619| return winner.decision
620| }
621| // Hook passed through (no decision) — wait for the SDK prompt
622| const sdkResult = await sdkPromise
623| return permissionPromptToolResultToPermissionDecision(
624| sdkResult.result,
625| tool,
626| input,
627| toolUseContext,
628| )
629| }
630|
631| // SDK prompt responded first — use its result (hook still running
632| // in background but its result will be ignored)
633| return permissionPromptToolResultToPermissionDecision(
634| winner.result,
635| tool,
636| input,
637| toolUseContext,
638| )
源码引用: src/cli/structuredIO.ts · 第 731–753 行(共 860 行)
731| createSandboxAskCallback(): (hostPattern: {
732| host: string
733| port?: number
734| }) => Promise<boolean> {
735| return async (hostPattern): Promise<boolean> => {
736| try {
737| const result = await this.sendRequest<PermissionToolOutput>(
738| {
739| subtype: 'can_use_tool',
740| tool_name: SANDBOX_NETWORK_ACCESS_TOOL_NAME,
741| input: { host: hostPattern.host },
742| tool_use_id: randomUUID(),
743| description: `Allow network connection to ${hostPattern.host}?`,
744| },
745| permissionToolOutputSchema(),
746| )
747| return result.behavior === 'allow'
748| } catch {
749| // If the request fails (stream closed, abort, etc.), deny the connection
750| return false
751| }
752| }
753| }
RemoteIO:Transport 与 CCR v2 集成
RemoteIO extends StructuredIO:
- 构造 PassThrough inputStream,super(inputStream)
- getTransportForUrl(url, headers, sessionId, refreshHeaders)
- transport.setOnData → inputStream.write;setOnClose → inputStream.end()
- CLAUDE_CODE_USE_CCR_V2 时:断言 SSETransport,new CCRClient,register internal event reader/writer,lifecycle → reportDelivery/reportState
write 重写:ccrClient.writeEvent 或 transport.write;bridge 模式下 control_request 总是 echo stdout(父进程检测权限请求)。
keep_alive:仅 bridge + GrowthBook session_keepalive_interval_v2_ms > 0;定时 write({ type: keep_alive })。
flushInternalEvents / internalEventsPending 委托 CCRClient,供 turn 边界 flush transcript internal events。
源码引用: src/cli/remoteIO.ts · 第 35–93 行(共 256 行)
35| export class RemoteIO extends StructuredIO {
36| private url: URL
37| private transport: Transport
38| private inputStream: PassThrough
39| private readonly isBridge: boolean = false
40| private readonly isDebug: boolean = false
41| private ccrClient: CCRClient | null = null
42| private keepAliveTimer: ReturnType<typeof setInterval> | null = null
43|
44| constructor(
45| streamUrl: string,
46| initialPrompt?: AsyncIterable<string>,
47| replayUserMessages?: boolean,
48| ) {
49| const inputStream = new PassThrough({ encoding: 'utf8' })
50| super(inputStream, replayUserMessages)
51| this.inputStream = inputStream
52| this.url = new URL(streamUrl)
53|
54| // Prepare headers with session token if available
55| const headers: Record<string, string> = {}
56| const sessionToken = getSessionIngressAuthToken()
57| if (sessionToken) {
58| headers['Authorization'] = `Bearer ${sessionToken}`
59| } else {
60| logForDebugging('[remote-io] No session ingress token available', {
61| level: 'error',
62| })
63| }
64|
65| // Add environment runner version if available (set by Environment Manager)
66| const erVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION
67| if (erVersion) {
68| headers['x-environment-runner-version'] = erVersion
69| }
70|
71| // Provide a callback that re-reads the session token dynamically.
72| // When the parent process refreshes the token (via token file or env var),
73| // the transport can pick it up on reconnection.
74| const refreshHeaders = (): Record<string, string> => {
75| const h: Record<string, string> = {}
76| const freshToken = getSessionIngressAuthToken()
77| if (freshToken) {
78| h['Authorization'] = `Bearer ${freshToken}`
79| }
80| const freshErVersion = process.env.CLAUDE_CODE_ENVIRONMENT_RUNNER_VERSION
81| if (freshErVersion) {
82| h['x-environment-runner-version'] = freshErVersion
83| }
84| return h
85| }
86|
87| // Get appropriate transport based on URL protocol
88| this.transport = getTransportForUrl(
89| this.url,
90| headers,
91| getSessionId(),
92| refreshHeaders,
93| )
源码引用: src/cli/remoteIO.ts · 第 116–168 行(共 256 行)
116| if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
117| // CCR v2 is SSE+POST by definition. getTransportForUrl returns
118| // SSETransport under the same env var, but the two checks live in
119| // different files — assert the invariant so a future decoupling
120| // fails loudly here instead of confusingly inside CCRClient.
121| if (!(this.transport instanceof SSETransport)) {
122| throw new Error(
123| 'CCR v2 requires SSETransport; check getTransportForUrl',
124| )
125| }
126| this.ccrClient = new CCRClient(this.transport, this.url)
127| const init = this.ccrClient.initialize()
128| this.restoredWorkerState = init.catch(() => null)
129| init.catch((error: unknown) => {
130| logForDiagnosticsNoPII('error', 'cli_worker_lifecycle_init_failed', {
131| reason: error instanceof CCRInitError ? error.reason : 'unknown',
132| })
133| logError(
134| new Error(`CCRClient initialization failed: ${errorMessage(error)}`),
135| )
136| void gracefulShutdown(1, 'other')
137| })
138| registerCleanup(async () => this.ccrClient?.close())
139|
140| // Register internal event writer for transcript persistence.
141| // When set, sessionStorage writes transcript messages as CCR v2
142| // internal events instead of v1 Session Ingress.
143| setInternalEventWriter((eventType, payload, options) =>
144| this.ccrClient!.writeInternalEvent(eventType, payload, options),
145| )
146|
147| // Register internal event readers for session resume.
148| // When set, hydrateFromCCRv2InternalEvents() can fetch foreground
149| // and subagent internal events to reconstruct conversation state.
150| setInternalEventReader(
151| () => this.ccrClient!.readInternalEvents(),
152| () => this.ccrClient!.readSubagentInternalEvents(),
153| )
154|
155| const LIFECYCLE_TO_DELIVERY = {
156| started: 'processing',
157| completed: 'processed',
158| } as const
159| setCommandLifecycleListener((uuid, state) => {
160| this.ccrClient?.reportDelivery(uuid, LIFECYCLE_TO_DELIVERY[state])
161| })
162| setSessionStateChangedListener((state, details) => {
163| this.ccrClient?.reportState(state, details)
164| })
165| setSessionMetadataChangedListener(metadata => {
166| this.ccrClient?.reportMetadata(metadata)
167| })
168| }
源码引用: src/cli/remoteIO.ts · 第 231–254 行(共 256 行)
231| async write(message: StdoutMessage): Promise<void> {
232| if (this.ccrClient) {
233| await this.ccrClient.writeEvent(message)
234| } else {
235| await this.transport.write(message)
236| }
237| if (this.isBridge) {
238| if (message.type === 'control_request' || this.isDebug) {
239| writeToStdout(ndjsonSafeStringify(message) + '\n')
240| }
241| }
242| }
243|
244| /**
245| * Clean up connections gracefully
246| */
247| close(): void {
248| if (this.keepAliveTimer) {
249| clearInterval(this.keepAliveTimer)
250| this.keepAliveTimer = null
251| }
252| this.transport.close()
253| this.inputStream.end()
254| }
ndjsonSafeStringify:行分割安全
JSON.stringify 对 U+2028 LINE SEPARATOR / U+2029 PARAGRAPH SEPARATOR 输出 raw Unicode。ECMA-262 把二者视为行终止符;用 JavaScript 语义 split 的接收方会在字符串 中间 截断 NDJSON 行。
ndjsonSafeStringify 在 jsonStringify 之后用正则替换为 \u2028 / \u2029 转义——语义等价 JSON,任何按行 split 的实现都安全。
这与 ES2019 Subsume JSON、Node util.inspect 的策略一致。ProcessTransport 已对非 JSON 行 silent skip(gh-28405),但截断仍丢消息——源头 escape 更可靠。
源码引用: src/cli/ndjsonSafeStringify.ts · 第 1–32 行(共 33 行)
1| import { jsonStringify } from '../utils/slowOperations.js'
2|
3| // JSON.stringify emits U+2028/U+2029 raw (valid per ECMA-404). When the
4| // output is a single NDJSON line, any receiver that uses JavaScript
5| // line-terminator semantics (ECMA-262 §11.3 — \n \r U+2028 U+2029) to
6| // split the stream will cut the JSON mid-string. ProcessTransport now
7| // silently skips non-JSON lines rather than crashing (gh-28405), but
8| // the truncated fragment is still lost — the message is silently dropped.
9| //
10| // The \uXXXX form is equivalent JSON (parses to the same string) but
11| // can never be mistaken for a line terminator by ANY receiver. This is
12| // what ES2019's "Subsume JSON" proposal and Node's util.inspect do.
13| //
14| // Single regex with alternation: the callback's one dispatch per match
15| // is cheaper than two full-string scans.
16| const JS_LINE_TERMINATORS = /\u2028|\u2029/g
17|
18| function escapeJsLineTerminators(json: string): string {
19| return json.replace(JS_LINE_TERMINATORS, c =>
20| c === '\u2028' ? '\\u2028' : '\\u2029',
21| )
22| }
23|
24| /**
25| * JSON.stringify for one-message-per-line transports. Escapes U+2028
26| * LINE SEPARATOR and U+2029 PARAGRAPH SEPARATOR so the serialized output
27| * cannot be broken by a line-splitting receiver. Output is still valid
28| * JSON and parses to the same value.
29| */
30| export function ndjsonSafeStringify(value: unknown): string {
31| return escapeJsLineTerminators(jsonStringify(value))
32| }
exit.ts 与 print.ts 接缝
cliError / cliOk 集中 process.exit,返回 : never 便于 TS 控制流收窄。测试可 spy process.exit 让其 return;cliOk 用 process.stdout.write 而非 console.log(Bun 路由差异)。
print.ts(约 5500 行)核心导出:
- runHeadless — headless 主函数:settings 订阅、MCP/tools 安装、StructuredIO/RemoteIO 选择、query 循环、outputFormat json/text
- getCanUseToolFn — permissionPromptToolName === 'stdio' 时用 structuredIO.createCanUseTool;否则自定义 PermissionPromptTool 或纯 hasPermissionsToUseTool
- createCanUseToolWithPermissionPrompt — 自定义 permission 工具与 abort race
- reconcileMcpServers / handleMcpSetServers — headless MCP 动态 server 列表
runHeadless options 含 sdkUrl、replayUserMessages、includePartialMessages、permissionPromptToolName 等,与 Commander 旗标一一对应(定义在 entrypoints/cli.tsx)。
源码引用: src/cli/exit.ts · 第 18–31 行(共 32 行)
18| /** Write an error message to stderr (if given) and exit with code 1. */
19| export function cliError(msg?: string): never {
20| // biome-ignore lint/suspicious/noConsole: centralized CLI error output
21| if (msg) console.error(msg)
22| process.exit(1)
23| return undefined as never
24| }
25|
26| /** Write a message to stdout (if given) and exit with code 0. */
27| export function cliOk(msg?: string): never {
28| if (msg) process.stdout.write(msg + '\n')
29| process.exit(0)
30| return undefined as never
31| }
源码引用: src/cli/print.ts · 第 455–532 行(共 5595 行)
455| export async function runHeadless(
456| inputPrompt: string | AsyncIterable<string>,
457| getAppState: () => AppState,
458| setAppState: (f: (prev: AppState) => AppState) => void,
459| commands: Command[],
460| tools: Tools,
461| sdkMcpConfigs: Record<string, McpSdkServerConfig>,
462| agents: AgentDefinition[],
463| options: {
464| continue: boolean | undefined
465| resume: string | boolean | undefined
466| resumeSessionAt: string | undefined
467| verbose: boolean | undefined
468| outputFormat: string | undefined
469| jsonSchema: Record<string, unknown> | undefined
470| permissionPromptToolName: string | undefined
471| allowedTools: string[] | undefined
472| thinkingConfig: ThinkingConfig | undefined
473| maxTurns: number | undefined
474| maxBudgetUsd: number | undefined
475| taskBudget: { total: number } | undefined
476| systemPrompt: string | undefined
477| appendSystemPrompt: string | undefined
478| userSpecifiedModel: string | undefined
479| fallbackModel: string | undefined
480| teleport: string | true | null | undefined
481| sdkUrl: string | undefined
482| replayUserMessages: boolean | undefined
483| includePartialMessages: boolean | undefined
484| forkSession: boolean | undefined
485| rewindFiles: string | undefined
486| enableAuthStatus: boolean | undefined
487| agent: string | undefined
488| workload: string | undefined
489| setupTrigger?: 'init' | 'maintenance' | undefined
490| sessionStartHooksPromise?: ReturnType<typeof processSessionStartHooks>
491| setSDKStatus?: (status: SDKStatus) => void
492| },
493| ): Promise<void> {
494| if (
495| process.env.USER_TYPE === 'ant' &&
496| isEnvTruthy(process.env.CLAUDE_CODE_EXIT_AFTER_FIRST_RENDER)
497| ) {
498| process.stderr.write(
499| `\nStartup time: ${Math.round(process.uptime() * 1000)}ms\n`,
500| )
501| // eslint-disable-next-line custom-rules/no-process-exit
502| process.exit(0)
503| }
504|
505| // Fire user settings download now so it overlaps with the MCP/tool setup
506| // below. Managed settings already started in main.tsx preAction; this gives
507| // user settings a similar head start. The cached promise is joined in
508| // installPluginsAndApplyMcpInBackground before plugin install reads
509| // enabledPlugins.
510| if (
511| feature('DOWNLOAD_USER_SETTINGS') &&
512| (isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || getIsRemoteMode())
513| ) {
514| void downloadUserSettings()
515| }
516|
517| // In headless mode there is no React tree, so the useSettingsChange hook
518| // never runs. Subscribe directly so that settings changes (including
519| // managed-settings / policy updates) are fully applied.
520| settingsChangeDetector.subscribe(source => {
521| applySettingsChange(source, setAppState)
522|
523| // In headless mode, also sync the denormalized fastMode field from
524| // settings. The TUI manages fastMode via the UI so it skips this.
525| if (isFastModeEnabled()) {
526| setAppState(prev => {
527| const s = prev.settings as Record<string, unknown>
528| const fastMode = s.fastMode === true && !s.fastModePerSessionOptIn
529| return { ...prev, fastMode }
530| })
531| }
532| })
源码引用: src/cli/print.ts · 第 4267–4295 行(共 5595 行)
4267| export function getCanUseToolFn(
4268| permissionPromptToolName: string | undefined,
4269| structuredIO: StructuredIO,
4270| getMcpTools: () => Tool[],
4271| onPermissionPrompt?: (details: RequiresActionDetails) => void,
4272| ): CanUseToolFn {
4273| if (permissionPromptToolName === 'stdio') {
4274| return structuredIO.createCanUseTool(onPermissionPrompt)
4275| }
4276| if (!permissionPromptToolName) {
4277| return async (
4278| tool,
4279| input,
4280| toolUseContext,
4281| assistantMessage,
4282| toolUseId,
4283| forceDecision,
4284| ) =>
4285| forceDecision ??
4286| (await hasPermissionsToUseTool(
4287| tool,
4288| input,
4289| toolUseContext,
4290| assistantMessage,
4291| toolUseId,
4292| ))
4293| }
4294| // Lazy lookup: MCP connects are per-server incremental in print mode, so
4295| // the tool may not be in appState yet at init time. Resolve on first call
injectControlResponse 与 bridge 权限
Bridge 从 claude.ai 收到用户权限决定后,调用 injectControlResponse:
- 查 pendingRequests[request_id]
- trackResolvedToolUseId + delete pending
- write control_cancel_request(取消 SDK consumer 侧 canUseTool 挂起)
- resolve/reject Promise
这与 onControlRequestSent / onControlRequestResolved 配对,解决 claude.ai 与 SDK host 双端 prompt 的 race。
unexpectedResponseCallback 处理 orphan control_response(无 pending 且非 duplicate toolUseID)。
源码引用: src/cli/structuredIO.ts · 第 283–309 行(共 860 行)
283| injectControlResponse(response: SDKControlResponse): void {
284| const requestId = response.response?.request_id
285| if (!requestId) return
286| const request = this.pendingRequests.get(requestId)
287| if (!request) return
288| this.trackResolvedToolUseId(request.request)
289| this.pendingRequests.delete(requestId)
290| // Cancel the SDK consumer's canUseTool callback — the bridge won.
291| void this.write({
292| type: 'control_cancel_request',
293| request_id: requestId,
294| })
295| if (response.response.subtype === 'error') {
296| request.reject(new Error(response.response.error))
297| } else {
298| const result = response.response.response
299| if (request.schema) {
300| try {
301| request.resolve(request.schema.parse(result))
302| } catch (error) {
303| request.reject(error)
304| }
305| } else {
306| request.resolve({})
307| }
308| }
309| }
源码引用: src/cli/structuredIO.ts · 第 176–187 行(共 860 行)
176| private trackResolvedToolUseId(request: SDKControlRequest): void {
177| if (request.request.subtype === 'can_use_tool') {
178| this.resolvedToolUseIds.add(request.request.tool_use_id)
179| if (this.resolvedToolUseIds.size > MAX_RESOLVED_TOOL_USE_IDS) {
180| // Evict the oldest entry (Sets iterate in insertion order)
181| const first = this.resolvedToolUseIds.values().next().value
182| if (first !== undefined) {
183| this.resolvedToolUseIds.delete(first)
184| }
185| }
186| }
187| }
源码目录
强关联:entrypoints/sdk/controlTypes.ts(消息类型)、entrypoints/cli.tsx(入口)、utils/sessionState.ts(RequiresActionDetails)、hooks/useCanUseTool.tsx(TUI 侧同名接缝)。
动手练习
- 管道一行 user JSON 进 claude --print,观察 stdout NDJSON assistant 事件
- 开启 --debug,grep cli_stdin_message_parsed diag
- 对比 TUI useCanUseTool 与 StructuredIO.createCanUseTool 的 ask 分支差异
- 阅读 outbound Stream drain 逻辑(print.ts 主循环),理解为何 stream_event 必须排队
本章小结与延伸
StructuredIO 是 SDK 与 headless 的协议心脏。下一章 cli-transports 讲解 RemoteIO 底下的 WS/Hybrid/SSE 选型。 继续学习: