本章总览
services/tools/StreamingToolExecutor.ts(约 530 行)在 query 流式接收 tool_use block 时立即入队执行,而非等 assistant 消息结束。它维护 TrackedTool 队列,按 isConcurrencySafe 决定并行/串行,buffer 结果并按接收顺序 yield,同时支持 progress 消息即时透出。本章要求你能画出 addTool → processQueue → executeTool → getCompletedResults 的状态机,并理解与 runToolUse 的分工。
学完本章你应该能
- 解释 concurrent-safe 与 exclusive 工具的调度规则
- 说明 siblingAbortController 与 Bash 错误级联
- 理解 discard() 在 streaming fallback 时的行为
- 掌握 getCompletedResults vs getRemainingResults 的使用场景
- 追踪 interruptBehavior cancel/block 与用户 ESC 中断的关系
核心概念(先读懂这些)
流式执行 vs 批量执行
feature gate streamingToolExecution 为 true 时 query.ts 创建 StreamingToolExecutor;false 则 turn 结束后 runTools 批量执行。流式路径让长耗时工具(Bash、Agent)尽早开始,progress 即时显示。fallback(模型切换、retry)时 discard 旧 executor 并新建,避免脏结果进入 transcript。
顺序保证与并发并行
tool_results 必须按 tool_use 出现顺序 yield 给模型(API 配对假设)。Executor 允许多个 safe 工具同时 executing,但 getCompletedResults 按 tools[] 插入顺序 yield 已完成者。非 safe 工具 executing 时阻塞后续 yield,直到该工具 completed。
三层 AbortController
toolUseContext.abortController(query 级)→ siblingAbortController(Bash 错误级联)→ toolAbortController(单工具,runToolUse 用)。权限拒绝 abort tool 级时需 bubble 到 query 级结束 turn;sibling_error 则不 abort 父级。
建议学习步骤
- 阅读类注释与 TrackedTool 状态机
- 阅读 addTool 与 unknown tool 分支
- 阅读 canExecuteTool / processQueue
- 阅读 executeTool 与 runToolUse 生成器循环
- 阅读 getAbortReason 与 createSyntheticErrorMessage
- 在 query.ts 搜索 StreamingToolExecutor 创建与 discard 点
常见误区
注意
contextModifier 目前不支持 concurrent 工具(注释明确限制)
注意
discarded 后 getRemainingResults 立即 return,不 yield 任何结果
注意
不要把 progress pendingProgress 与 results 混淆——progress 可早于 completed yield
在 query 循环中的位置
query.ts(约 561 行)在 turn 开始时:
const useStreamingToolExecution = config.gates.streamingToolExecution
let streamingToolExecutor = useStreamingToolExecution
? new StreamingToolExecutor(toolUseContext.options.tools, canUseTool, toolUseContext)
: null
流式事件循环中,每收到 tool_use content block 即 addTool(block, assistantMessage)。同一 loop 内反复 yield* getCompletedResults() 把已完成 tool_result / progress 推入 messages。
Streaming fallback: 模型 retry 或切换时 discard 旧 executor 并 new 新实例(约 733、912 行),防止 failed attempt 的工具结果污染。
turn 结束时 await getRemainingResults() 等待全部 executing 完成。
源码引用: src/query.ts · 第 561–568 行(共 1730 行)
561| const useStreamingToolExecution = config.gates.streamingToolExecution
562| let streamingToolExecutor = useStreamingToolExecution
563| ? new StreamingToolExecutor(
564| toolUseContext.options.tools,
565| canUseTool,
566| toolUseContext,
567| )
568| : null
TrackedTool 状态机
每个 tool_use 对应一个 TrackedTool:
| 字段 | 含义 |
|---|---|
| status | queued → executing → completed → yielded |
| isConcurrencySafe | 来自 tool.isConcurrencySafe(parsedInput) |
| promise | collectResults 异步任务 |
| results | 最终 Message[](含 synthetic error) |
| pendingProgress | 待立即 yield 的 ProgressMessage |
| contextModifiers | runToolUse 返回的 context 变更(仅非 concurrent) |
ToolStatus 还有 yielded: getCompletedResults 标记已 yield,防止重复输出。
类注释概括设计目标:concurrent-safe 可并行;non-concurrent 独占;结果 buffer 按接收顺序 emit。
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 19–39 行(共 531 行)
19| type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
20|
21| type TrackedTool = {
22| id: string
23| block: ToolUseBlock
24| assistantMessage: AssistantMessage
25| status: ToolStatus
26| isConcurrencySafe: boolean
27| promise?: Promise<void>
28| results?: Message[]
29| // Progress messages are stored separately and yielded immediately
30| pendingProgress: Message[]
31| contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
32| }
33|
34| /**
35| * Executes tools as they stream in with concurrency control.
36| * - Concurrent-safe tools can execute in parallel with other concurrent-safe tools
37| * - Non-concurrent tools must execute alone (exclusive access)
38| * - Results are buffered and emitted in the order tools were received
39| */
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 40–62 行(共 531 行)
40| export class StreamingToolExecutor {
41| private tools: TrackedTool[] = []
42| private toolUseContext: ToolUseContext
43| private hasErrored = false
44| private erroredToolDescription = ''
45| // Child of toolUseContext.abortController. Fires when a Bash tool errors
46| // so sibling subprocesses die immediately instead of running to completion.
47| // Aborting this does NOT abort the parent — query.ts won't end the turn.
48| private siblingAbortController: AbortController
49| private discarded = false
50| // Signal to wake up getRemainingResults when progress is available
51| private progressAvailableResolve?: () => void
52|
53| constructor(
54| private readonly toolDefinitions: Tools,
55| private readonly canUseTool: CanUseToolFn,
56| toolUseContext: ToolUseContext,
57| ) {
58| this.toolUseContext = toolUseContext
59| this.siblingAbortController = createChildAbortController(
60| toolUseContext.abortController,
61| )
62| }
addTool:入队与 unknown tool
addTool(block, assistantMessage):
- findToolByName 解析工具定义
- 未知工具 — 立即 push status=completed,results 为 synthetic tool_use_error(不抛异常)
- inputSchema.safeParse → isConcurrencySafe(parse 失败则 false)
- push status=queued,void processQueue()
unknown tool 分支保证模型幻觉 tool name 时 turn 仍可继续,模型收到明确 error block。
safeParse 失败时 isConcurrencySafe=false,工具串行尝试执行;runToolUse 内会处理 schema 错误。
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 76–124 行(共 531 行)
76| addTool(block: ToolUseBlock, assistantMessage: AssistantMessage): void {
77| const toolDefinition = findToolByName(this.toolDefinitions, block.name)
78| if (!toolDefinition) {
79| this.tools.push({
80| id: block.id,
81| block,
82| assistantMessage,
83| status: 'completed',
84| isConcurrencySafe: true,
85| pendingProgress: [],
86| results: [
87| createUserMessage({
88| content: [
89| {
90| type: 'tool_result',
91| content: `<tool_use_error>Error: No such tool available: ${block.name}</tool_use_error>`,
92| is_error: true,
93| tool_use_id: block.id,
94| },
95| ],
96| toolUseResult: `Error: No such tool available: ${block.name}`,
97| sourceToolAssistantUUID: assistantMessage.uuid,
98| }),
99| ],
100| })
101| return
102| }
103|
104| const parsedInput = toolDefinition.inputSchema.safeParse(block.input)
105| const isConcurrencySafe = parsedInput?.success
106| ? (() => {
107| try {
108| return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data))
109| } catch {
110| return false
111| }
112| })()
113| : false
114| this.tools.push({
115| id: block.id,
116| block,
117| assistantMessage,
118| status: 'queued',
119| isConcurrencySafe,
120| pendingProgress: [],
121| })
122|
123| void this.processQueue()
124| }
processQueue 与 canExecuteTool
processQueue 线性扫描 tools 数组(保持顺序):
- status !== queued → skip
- canExecuteTool(isConcurrencySafe) → executeTool
- 否则:若 !isConcurrencySafe → break(后续 queued 等待)
- 若 safe 但当前不能执行 → continue 看下一个 queued
canExecuteTool 逻辑:
executing 为空 → 可执行
或:新工具 safe 且所有 executing 都 safe → 可并行
顺序语义: 非 safe 工具前的 safe 工具可先跑;遇到 non-safe queued 且无法执行时停止扫描,保证 exclusive 工具不会「插队」提前 yield 结果。
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 129–151 行(共 531 行)
129| private canExecuteTool(isConcurrencySafe: boolean): boolean {
130| const executingTools = this.tools.filter(t => t.status === 'executing')
131| return (
132| executingTools.length === 0 ||
133| (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
134| )
135| }
136|
137| /**
138| * Process the queue, starting tools when concurrency conditions allow
139| */
140| private async processQueue(): Promise<void> {
141| for (const tool of this.tools) {
142| if (tool.status !== 'queued') continue
143|
144| if (this.canExecuteTool(tool.isConcurrencySafe)) {
145| await this.executeTool(tool)
146| } else {
147| // Can't execute this tool yet, and since we need to maintain order for non-concurrent tools, stop here
148| if (!tool.isConcurrencySafe) break
149| }
150| }
151| }
executeTool 与 runToolUse
executeTool 设置 executing、更新 setInProgressToolUseIDs、updateInterruptibleState,然后 collectResults():
初始 abort 检查: getAbortReason 非 null → synthetic error,不调用 runToolUse
runToolUse 生成器: 传入 per-tool toolAbortController。for await update:
- progress → pendingProgress + wake progressAvailableResolve
- 其他 message → messages[]
- is_error tool_result → Bash 名则 sibling abort
- contextModifier → 收集(仅 non-safe 工具结束后 apply)
executeTool 结束 mark completed,finally 触发 processQueue 继续调度。
runToolUse(toolExecution.ts)负责 validateInput、PreToolUse hooks、canUseTool、tool.call、PostToolUse hooks、telemetry——Executor 只做调度壳。
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 265–340 行(共 531 行)
265| private async executeTool(tool: TrackedTool): Promise<void> {
266| tool.status = 'executing'
267| this.toolUseContext.setInProgressToolUseIDs(prev =>
268| new Set(prev).add(tool.id),
269| )
270| this.updateInterruptibleState()
271|
272| const messages: Message[] = []
273| const contextModifiers: Array<(context: ToolUseContext) => ToolUseContext> =
274| []
275|
276| const collectResults = async () => {
277| // If already aborted (by error or user), generate synthetic error block instead of running the tool
278| const initialAbortReason = this.getAbortReason(tool)
279| if (initialAbortReason) {
280| messages.push(
281| this.createSyntheticErrorMessage(
282| tool.id,
283| initialAbortReason,
284| tool.assistantMessage,
285| ),
286| )
287| tool.results = messages
288| tool.contextModifiers = contextModifiers
289| tool.status = 'completed'
290| this.updateInterruptibleState()
291| return
292| }
293|
294| // Per-tool child controller. Lets siblingAbortController kill running
295| // subprocesses (Bash spawns listen to this signal) when a Bash error
296| // cascades. Permission-dialog rejection also aborts this controller
297| // (PermissionContext.ts cancelAndAbort) — that abort must bubble up to
298| // the query controller so the query loop's post-tool abort check ends
299| // the turn. Without bubble-up, ExitPlanMode "clear context + auto"
300| // sends REJECT_MESSAGE to the model instead of aborting (#21056 regression).
301| const toolAbortController = createChildAbortController(
302| this.siblingAbortController,
303| )
304| toolAbortController.signal.addEventListener(
305| 'abort',
306| () => {
307| if (
308| toolAbortController.signal.reason !== 'sibling_error' &&
309| !this.toolUseContext.abortController.signal.aborted &&
310| !this.discarded
311| ) {
312| this.toolUseContext.abortController.abort(
313| toolAbortController.signal.reason,
314| )
315| }
316| },
317| { once: true },
318| )
319|
320| const generator = runToolUse(
321| tool.block,
322| tool.assistantMessage,
323| this.canUseTool,
324| { ...this.toolUseContext, abortController: toolAbortController },
325| )
326|
327| // Track if this specific tool has produced an error result.
328| // This prevents the tool from receiving a duplicate "sibling error"
329| // message when it is the one that caused the error.
330| let thisToolErrored = false
331|
332| for await (const update of generator) {
333| // Check if we were aborted by a sibling tool error or user interruption.
334| // Only add the synthetic error if THIS tool didn't produce the error.
335| const abortReason = this.getAbortReason(tool)
336| if (abortReason && !thisToolErrored) {
337| messages.push(
338| this.createSyntheticErrorMessage(
339| tool.id,
340| abortReason,
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 383–405 行(共 531 行)
383| tool.results = messages
384| tool.contextModifiers = contextModifiers
385| tool.status = 'completed'
386| this.updateInterruptibleState()
387|
388| // NOTE: we currently don't support context modifiers for concurrent
389| // tools. None are actively being used, but if we want to use
390| // them in concurrent tools, we need to support that here.
391| if (!tool.isConcurrencySafe && contextModifiers.length > 0) {
392| for (const modifier of contextModifiers) {
393| this.toolUseContext = modifier(this.toolUseContext)
394| }
395| }
396| }
397|
398| const promise = collectResults()
399| tool.promise = promise
400|
401| // Process more queue when done
402| void promise.finally(() => {
403| void this.processQueue()
404| })
405| }
源码引用: src/services/tools/toolExecution.ts · 第 133–150 行(共 1746 行)
133| /** Minimum total hook duration (ms) to show inline timing summary */
134| export const HOOK_TIMING_DISPLAY_THRESHOLD_MS = 500
135| /** Log a debug warning when hooks/permission-decision block for this long. Matches
136| * BashTool's PROGRESS_THRESHOLD_MS — the collapsed view feels stuck past this. */
137| const SLOW_PHASE_LOG_THRESHOLD_MS = 2000
138|
139| /**
140| * Classify a tool execution error into a telemetry-safe string.
141| *
142| * In minified/external builds, `error.constructor.name` is mangled into
143| * short identifiers like "nJT" or "Chq" — useless for diagnostics.
144| * This function extracts structured, telemetry-safe information instead:
145| * - TelemetrySafeError: use its telemetryMessage (already vetted)
146| * - Node.js fs errors: log the error code (ENOENT, EACCES, etc.)
147| * - Known error types: use their unminified name
148| * - Fallback: "Error" (better than a mangled 3-char identifier)
149| */
150| export function classifyToolError(error: unknown): string {
Synthetic error 与中断语义
createSyntheticErrorMessage 三种 reason:
| reason | 模型/ UI 语义 |
|---|---|
| user_interrupted | REJECT_MESSAGE(withMemoryCorrectionHint) |
| streaming_fallback | tool execution discarded |
| sibling_error | Cancelled: parallel tool call {desc} errored |
getAbortReason 判定:
- discarded → streaming_fallback
- hasErrored → sibling_error
- abortController aborted + reason=interrupt → 仅 interruptBehavior=cancel 的工具 user_interrupted
getToolInterruptBehavior 默认 block——用户输入新消息时 block 工具不应被 abort(abort 本身不应 fired,防御性检查)。
setHasInterruptibleToolInProgress:全部 executing 工具均为 cancel 行为时为 true,REPL ESC 逻辑消费。
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 153–205 行(共 531 行)
153| private createSyntheticErrorMessage(
154| toolUseId: string,
155| reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback',
156| assistantMessage: AssistantMessage,
157| ): Message {
158| // For user interruptions (ESC to reject), use REJECT_MESSAGE so the UI shows
159| // "User rejected edit" instead of "Error editing file"
160| if (reason === 'user_interrupted') {
161| return createUserMessage({
162| content: [
163| {
164| type: 'tool_result',
165| content: withMemoryCorrectionHint(REJECT_MESSAGE),
166| is_error: true,
167| tool_use_id: toolUseId,
168| },
169| ],
170| toolUseResult: 'User rejected tool use',
171| sourceToolAssistantUUID: assistantMessage.uuid,
172| })
173| }
174| if (reason === 'streaming_fallback') {
175| return createUserMessage({
176| content: [
177| {
178| type: 'tool_result',
179| content:
180| '<tool_use_error>Error: Streaming fallback - tool execution discarded</tool_use_error>',
181| is_error: true,
182| tool_use_id: toolUseId,
183| },
184| ],
185| toolUseResult: 'Streaming fallback - tool execution discarded',
186| sourceToolAssistantUUID: assistantMessage.uuid,
187| })
188| }
189| const desc = this.erroredToolDescription
190| const msg = desc
191| ? `Cancelled: parallel tool call ${desc} errored`
192| : 'Cancelled: parallel tool call errored'
193| return createUserMessage({
194| content: [
195| {
196| type: 'tool_result',
197| content: `<tool_use_error>${msg}</tool_use_error>`,
198| is_error: true,
199| tool_use_id: toolUseId,
200| },
201| ],
202| toolUseResult: msg,
203| sourceToolAssistantUUID: assistantMessage.uuid,
204| })
205| }
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 210–260 行(共 531 行)
210| private getAbortReason(
211| tool: TrackedTool,
212| ): 'sibling_error' | 'user_interrupted' | 'streaming_fallback' | null {
213| if (this.discarded) {
214| return 'streaming_fallback'
215| }
216| if (this.hasErrored) {
217| return 'sibling_error'
218| }
219| if (this.toolUseContext.abortController.signal.aborted) {
220| // 'interrupt' means the user typed a new message while tools were
221| // running. Only cancel tools whose interruptBehavior is 'cancel';
222| // 'block' tools shouldn't reach here (abort isn't fired).
223| if (this.toolUseContext.abortController.signal.reason === 'interrupt') {
224| return this.getToolInterruptBehavior(tool) === 'cancel'
225| ? 'user_interrupted'
226| : null
227| }
228| return 'user_interrupted'
229| }
230| return null
231| }
232|
233| private getToolInterruptBehavior(tool: TrackedTool): 'cancel' | 'block' {
234| const definition = findToolByName(this.toolDefinitions, tool.block.name)
235| if (!definition?.interruptBehavior) return 'block'
236| try {
237| return definition.interruptBehavior()
238| } catch {
239| return 'block'
240| }
241| }
242|
243| private getToolDescription(tool: TrackedTool): string {
244| const input = tool.block.input as Record<string, unknown> | undefined
245| const summary = input?.command ?? input?.file_path ?? input?.pattern ?? ''
246| if (typeof summary === 'string' && summary.length > 0) {
247| const truncated =
248| summary.length > 40 ? summary.slice(0, 40) + '\u2026' : summary
249| return `${tool.block.name}(${truncated})`
250| }
251| return tool.block.name
252| }
253|
254| private updateInterruptibleState(): void {
255| const executing = this.tools.filter(t => t.status === 'executing')
256| this.toolUseContext.setHasInterruptibleToolInProgress?.(
257| executing.length > 0 &&
258| executing.every(t => this.getToolInterruptBehavior(t) === 'cancel'),
259| )
260| }
getCompletedResults 与 getRemainingResults
getCompletedResults(generator,非阻塞):
- 对每个 tool 先 drain pendingProgress(不论 status)
- completed 且未 yielded → yield results,mark yielded,markToolUseAsComplete
- executing 且 !isConcurrencySafe → break(保持顺序)
getRemainingResults(async generator):
- while hasUnfinishedTools:processQueue → yield completed → 若仍在 executing 且无 progress,Promise.race 等 promise 或 progress
- 最后 sweep getCompletedResults
query 流式 loop 用前者「捞」已完成;turn 收尾用后者「等」未完成。
markToolUseAsComplete 从 setInProgressToolUseIDs 删除 toolUseID。
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 412–440 行(共 531 行)
412| *getCompletedResults(): Generator<MessageUpdate, void> {
413| if (this.discarded) {
414| return
415| }
416|
417| for (const tool of this.tools) {
418| // Always yield pending progress messages immediately, regardless of tool status
419| while (tool.pendingProgress.length > 0) {
420| const progressMessage = tool.pendingProgress.shift()!
421| yield { message: progressMessage, newContext: this.toolUseContext }
422| }
423|
424| if (tool.status === 'yielded') {
425| continue
426| }
427|
428| if (tool.status === 'completed' && tool.results) {
429| tool.status = 'yielded'
430|
431| for (const message of tool.results) {
432| yield { message, newContext: this.toolUseContext }
433| }
434|
435| markToolUseAsComplete(this.toolUseContext, tool.id)
436| } else if (tool.status === 'executing' && !tool.isConcurrencySafe) {
437| break
438| }
439| }
440| }
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 453–490 行(共 531 行)
453| async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
454| if (this.discarded) {
455| return
456| }
457|
458| while (this.hasUnfinishedTools()) {
459| await this.processQueue()
460|
461| for (const result of this.getCompletedResults()) {
462| yield result
463| }
464|
465| // If we still have executing tools but nothing completed, wait for any to complete
466| // OR for progress to become available
467| if (
468| this.hasExecutingTools() &&
469| !this.hasCompletedResults() &&
470| !this.hasPendingProgress()
471| ) {
472| const executingPromises = this.tools
473| .filter(t => t.status === 'executing' && t.promise)
474| .map(t => t.promise!)
475|
476| // Also wait for progress to become available
477| const progressPromise = new Promise<void>(resolve => {
478| this.progressAvailableResolve = resolve
479| })
480|
481| if (executingPromises.length > 0) {
482| await Promise.race([...executingPromises, progressPromise])
483| }
484| }
485| }
486|
487| for (const result of this.getCompletedResults()) {
488| yield result
489| }
490| }
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 521–530 行(共 531 行)
521| function markToolUseAsComplete(
522| toolUseContext: ToolUseContext,
523| toolUseID: string,
524| ) {
525| toolUseContext.setInProgressToolUseIDs(prev => {
526| const next = new Set(prev)
527| next.delete(toolUseID)
528| return next
529| })
530| }
discard 与 streaming fallback
discard() 设 discarded=true。之后:
- addTool 仍会入队但 executeTool 初始 abortReason=streaming_fallback
- getCompletedResults / getRemainingResults 开头 return
query retry 路径 discard + new Executor,旧 TrackedTool 上的 executing promise 可能仍在跑,但结果不再 yield——工具应监听 abort signal 尽快结束。
与 query abort 区别: discard 不 abort parent controller;用户 Cancel 整轮则 abortController.abort。
源码引用: src/services/tools/StreamingToolExecutor.ts · 第 64–71 行(共 531 行)
64| /**
65| * Discards all pending and in-progress tools. Called when streaming fallback
66| * occurs and results from the failed attempt should be abandoned.
67| * Queued tools won't start, and in-progress tools will receive synthetic errors.
68| */
69| discard(): void {
70| this.discarded = true
71| }
源码引用: src/query.ts · 第 733–740 行(共 1730 行)
733| if (streamingToolExecutor) {
734| streamingToolExecutor.discard()
735| streamingToolExecutor = new StreamingToolExecutor(
736| toolUseContext.options.tools,
737| canUseTool,
738| toolUseContext,
739| )
740| }
源码目录与 runToolUse 边界
Executor 在 services/tools/ 而非 src/tools/,属执行编排层。单工具生命周期详见 toolExecution.ts、toolHooks.ts。点击 StreamingToolExecutor.ts 跳回本章源码块。
动手练习
- 在 query.ts 设 breakpoint 于 addTool,观察同一 assistant turn 内多个 tool_use 的入队顺序
- 并行两个 Read(safe)+ 一个 Edit(unsafe),记录 executing 时间线
- 模拟 streaming fallback,确认 discard 后 transcript 无 orphan tool_result
- 阅读 runToolUse 开头,列出 Executor 之前/之后各做哪些事
- 对照 Tool.interruptBehavior 文档与 getAbortReason 的 interrupt 分支
本章小结与延伸
StreamingToolExecutor = query 内的工具调度器。单工具逻辑在 runToolUse;Bash 级联见 bash-tool 章。 继续学习: