本章总览
bridgeMessaging.ts(约 461 行)集中 REPL 与 env-less 双核心共用的「传输层纯函数」:SDK 类型守卫、出站 eligibility、ingress 路由、服务端 control_request 应答、UUID 回声去重。inboundMessages.ts 规范化手机端 user 消息(含图片块 media_type)。inboundAttachments.ts 把 web composer 的 file_uuid 拉取为本地 @path。flushGate.ts 在初始历史 flush 期间排队新消息,防止与批量 POST 交错。本章要求你能从 WebSocket 原始 JSON 追到 REPL onInboundMessage 回调。
学完本章你应该能
- 说明 handleIngressMessage 的分支顺序与 UUID 去重策略
- 解释 isEligibleBridgeMessage 为何过滤 virtual 与 tool_result
- 描述 extractInboundMessageFields 与 normalizeImageBlocks 的兼容逻辑
- 理解 resolveInboundAttachments 的 best-effort 与磁盘布局
- 掌握 FlushGate 在 transport 替换时的 deactivate 语义
核心概念(先读懂这些)
bridgeMessaging 无 bridge 闭包状态
文件头声明:一切协作者(transport、sessionId、BoundedUUIDSet、回调)经参数传入,initBridgeCore 与 initEnvLessBridgeCore 共用同一套 ingress/egress 解析,避免 fork 两套 divergent 逻辑。
BoundedUUIDSet 双集合分工
recentPostedUUIDs 丢弃自己刚发出的回声;recentInboundUUIDs 丢弃服务端重放的历史 user 消息(seq 协商失败时的防御层)。两者均为有界集合,防止长跑会话内存无限增长。
initialize 在 outboundOnly 仍须 success
handleServerControlRequest 对 outboundOnly 的可变请求返回错误,但 initialize 必须 success——否则服务端 ~10–14s 杀 WS。这是 mirror 模式与完整 remote control 的分水岭。
建议学习步骤
- 阅读源码块 A:类型守卫 isSDKMessage 等
- 阅读源码块 B:isEligibleBridgeMessage 与 extractTitleText
- 阅读源码块 C:handleIngressMessage 路由
- 阅读源码块 D:handleServerControlRequest 分支
- 阅读源码块 E:extractInboundMessageFields
- 阅读源码块 F:inboundAttachments 解析
- 阅读源码块 G:FlushGate 状态机
常见误区
注意
control_response 在 isSDKMessage 之前处理,否则类型守卫失败
注意
非 user 的 inbound SDKMessage 被忽略(不转发给 onInboundMessage)
注意
isSyntheticMessage 不在 extractTitleText 内过滤(重 import)
注意
附件失败不阻断消息,仅缺 @path 前缀
在架构中的位置
消息在 bridge 栈中的三层划分:
REPL Message[] ──isEligibleBridgeMessage──► 出站过滤
│ toSDKMessages / makeResultMessage
▼
ReplBridgeTransport.write(StdoutMessage)
│
▼ (ingress)
handleIngressMessage ──user──► onInboundMessage(SDKMessage)
│ │
│ control_request ├─ extractInboundMessageFields
└──────────────────────────────├─ resolveInboundAttachments
└─ 入队到 REPL 输入管道
bridgeMessaging 不处理 HTTP session 创建或 poll;inboundAttachments 在 user 消息入队前异步增强 content;flushGate 仅在 replBridge/remoteBridgeCore 的 write 路径介入。
类型守卫与出站过滤
入口解析使用 normalizeControlMessageKeys + jsonParse,兼容服务端 snake_case 与历史 camelCase 混用。
| 函数 | 作用 |
|---|---|
isSDKMessage | 检查 object + string type 判别式 |
isSDKControlResponse | permission 回调通道 |
isSDKControlRequest | 服务端 initialize/interrupt/set_model 等 |
isEligibleBridgeMessage | 出站:user/assistant/local_command system;跳过 virtual |
extractTitleText 为 onUserMessage 标题策略服务:跳过 meta、toolUseResult、compact summary、非 human origin、纯 display tag。与 initReplBridge 中 initialMessages 扫描规则对齐,但 isSyntheticMessage 仅在 init 路径显式检查(避免 messages.ts → commands 依赖)。
makeResultMessage 构造 SDK result 帧供回合结束上报。
源码引用: src/bridge/bridgeMessaging.ts · 第 31–88 行(共 462 行)
31| // ─── Type guards ─────────────────────────────────────────────────────────────
32|
33| /** Type predicate for parsed WebSocket messages. SDKMessage is a
34| * discriminated union on `type` — validating the discriminant is
35| * sufficient for the predicate; callers narrow further via the union. */
36| export function isSDKMessage(value: unknown): value is SDKMessage {
37| return (
38| value !== null &&
39| typeof value === 'object' &&
40| 'type' in value &&
41| typeof value.type === 'string'
42| )
43| }
44|
45| /** Type predicate for control_response messages from the server. */
46| export function isSDKControlResponse(
47| value: unknown,
48| ): value is SDKControlResponse {
49| return (
50| value !== null &&
51| typeof value === 'object' &&
52| 'type' in value &&
53| value.type === 'control_response' &&
54| 'response' in value
55| )
56| }
57|
58| /** Type predicate for control_request messages from the server. */
59| export function isSDKControlRequest(
60| value: unknown,
61| ): value is SDKControlRequest {
62| return (
63| value !== null &&
64| typeof value === 'object' &&
65| 'type' in value &&
66| value.type === 'control_request' &&
67| 'request_id' in value &&
68| 'request' in value
69| )
70| }
71|
72| /**
73| * True for message types that should be forwarded to the bridge transport.
74| * The server only wants user/assistant turns and slash-command system events;
75| * everything else (tool_result, progress, etc.) is internal REPL chatter.
76| */
77| export function isEligibleBridgeMessage(m: Message): boolean {
78| // Virtual messages (REPL inner calls) are display-only — bridge/SDK
79| // consumers see the REPL tool_use/result which summarizes the work.
80| if ((m.type === 'user' || m.type === 'assistant') && m.isVirtual) {
81| return false
82| }
83| return (
84| m.type === 'user' ||
85| m.type === 'assistant' ||
86| (m.type === 'system' && m.subtype === 'local_command')
87| )
88| }
源码引用: src/bridge/bridgeMessaging.ts · 第 90–122 行(共 462 行)
90| /**
91| * Extract title-worthy text from a Message for onUserMessage. Returns
92| * undefined for messages that shouldn't title the session: non-user, meta
93| * (nudges), tool results, compact summaries, non-human origins (task
94| * notifications, channel messages), or pure display-tag content
95| * (<ide_opened_file>, <session-start-hook>, etc.).
96| *
97| * Synthetic interrupts ([Request interrupted by user]) are NOT filtered here —
98| * isSyntheticMessage lives in messages.ts (heavy import, pulls command
99| * registry). The initialMessages path in initReplBridge checks it; the
100| * writeMessages path reaching an interrupt as the *first* message is
101| * implausible (an interrupt implies a prior prompt already flowed through).
102| */
103| export function extractTitleText(m: Message): string | undefined {
104| if (m.type !== 'user' || m.isMeta || m.toolUseResult || m.isCompactSummary)
105| return undefined
106| if (m.origin && m.origin.kind !== 'human') return undefined
107| const content = m.message.content
108| let raw: string | undefined
109| if (typeof content === 'string') {
110| raw = content
111| } else {
112| for (const block of content) {
113| if (block.type === 'text') {
114| raw = block.text
115| break
116| }
117| }
118| }
119| if (!raw) return undefined
120| const clean = stripDisplayTagsAllowEmpty(raw)
121| return clean || undefined
122| }
handleIngressMessage
ingress 处理顺序严格固定,避免错误窄化:
- control_response →
onPermissionResponse(非 SDKMessage union 成员) - control_request →
onControlRequest(通常转 handleServerControlRequest) - isSDKMessage 失败则丢弃
- UUID ∈ recentPostedUUIDs → 回声忽略
- UUID ∈ recentInboundUUIDs → 重放忽略
- type === 'user' → 记入 inbound 集合并
onInboundMessage(fire-and-forget,可 async 附件) - 其他 type → debug 日志忽略
tengu_bridge_message_received 事件仅在成功转发 user 时触发。解析异常捕获后打 debug,不抛到 transport 层——防止单条脏数据断开 WS。
与 mobile 的关系:iOS/web 发来的 user 消息走同一路径;图片块若缺 media_type,下游在 inboundMessages 修复,而非此处。
源码引用: src/bridge/bridgeMessaging.ts · 第 124–208 行(共 462 行)
124| // ─── Ingress routing ─────────────────────────────────────────────────────────
125|
126| /**
127| * Parse an ingress WebSocket message and route it to the appropriate handler.
128| * Ignores messages whose UUID is in recentPostedUUIDs (echoes of what we sent)
129| * or in recentInboundUUIDs (re-deliveries we've already forwarded — e.g.
130| * server replayed history after a transport swap lost the seq-num cursor).
131| */
132| export function handleIngressMessage(
133| data: string,
134| recentPostedUUIDs: BoundedUUIDSet,
135| recentInboundUUIDs: BoundedUUIDSet,
136| onInboundMessage: ((msg: SDKMessage) => void | Promise<void>) | undefined,
137| onPermissionResponse?: ((response: SDKControlResponse) => void) | undefined,
138| onControlRequest?: ((request: SDKControlRequest) => void) | undefined,
139| ): void {
140| try {
141| const parsed: unknown = normalizeControlMessageKeys(jsonParse(data))
142|
143| // control_response is not an SDKMessage — check before the type guard
144| if (isSDKControlResponse(parsed)) {
145| logForDebugging('[bridge:repl] Ingress message type=control_response')
146| onPermissionResponse?.(parsed)
147| return
148| }
149|
150| // control_request from the server (initialize, set_model, can_use_tool).
151| // Must respond promptly or the server kills the WS (~10-14s timeout).
152| if (isSDKControlRequest(parsed)) {
153| logForDebugging(
154| `[bridge:repl] Inbound control_request subtype=${parsed.request.subtype}`,
155| )
156| onControlRequest?.(parsed)
157| return
158| }
159|
160| if (!isSDKMessage(parsed)) return
161|
162| // Check for UUID to detect echoes of our own messages
163| const uuid =
164| 'uuid' in parsed && typeof parsed.uuid === 'string'
165| ? parsed.uuid
166| : undefined
167|
168| if (uuid && recentPostedUUIDs.has(uuid)) {
169| logForDebugging(
170| `[bridge:repl] Ignoring echo: type=${parsed.type} uuid=${uuid}`,
171| )
172| return
173| }
174|
175| // Defensive dedup: drop inbound prompts we've already forwarded. The
176| // SSE seq-num carryover (lastTransportSequenceNum) is the primary fix
177| // for history-replay; this catches edge cases where that negotiation
178| // fails (server ignores from_sequence_num, transport died before
179| // receiving any frames, etc).
180| if (uuid && recentInboundUUIDs.has(uuid)) {
181| logForDebugging(
182| `[bridge:repl] Ignoring re-delivered inbound: type=${parsed.type} uuid=${uuid}`,
183| )
184| return
185| }
186|
187| logForDebugging(
188| `[bridge:repl] Ingress message type=${parsed.type}${uuid ? ` uuid=${uuid}` : ''}`,
189| )
190|
191| if (parsed.type === 'user') {
192| if (uuid) recentInboundUUIDs.add(uuid)
193| logEvent('tengu_bridge_message_received', {
194| is_repl: true,
195| })
196| // Fire-and-forget — handler may be async (attachment resolution).
197| void onInboundMessage?.(parsed)
198| } else {
199| logForDebugging(
200| `[bridge:repl] Ignoring non-user inbound message: type=${parsed.type}`,
201| )
202| }
203| } catch (err) {
204| logForDebugging(
205| `[bridge:repl] Failed to parse ingress message: ${errorMessage(err)}`,
206| )
207| }
208| }
handleServerControlRequest
服务端下发的 control_request 必须在毫秒级响应,否则连接被杀。子类型处理:
| subtype | 行为 |
|---|---|
| initialize | 返回空 commands/models 的 success capabilities(REPL 自给自足) |
| set_model | 调 onSetModel,success |
| set_max_thinking_tokens | 调 onSetMaxThinkingTokens |
| set_permission_mode | 调 onSetPermissionMode,失败时 error 而非假 success |
| interrupt | 调 onInterrupt |
outboundOnly 时除 initialize 外一律 OUTBOUND_ONLY_ERROR——claude.ai 不会显示「已切换模型」而本地无操作。
ServerControlRequestHandlers 显式携带 transport、sessionId、回调,供 replBridge 与 remoteBridgeCore 传入同一实现。响应经 transport.write 发出,并附带 session_id 字段。
源码引用: src/bridge/bridgeMessaging.ts · 第 210–283 行(共 462 行)
210| // ─── Server-initiated control requests ───────────────────────────────────────
211|
212| export type ServerControlRequestHandlers = {
213| transport: ReplBridgeTransport | null
214| sessionId: string
215| /**
216| * When true, all mutable requests (interrupt, set_model, set_permission_mode,
217| * set_max_thinking_tokens) reply with an error instead of false-success.
218| * initialize still replies success — the server kills the connection otherwise.
219| * Used by the outbound-only bridge mode and the SDK's /bridge subpath so claude.ai sees a
220| * proper error instead of "action succeeded but nothing happened locally".
221| */
222| outboundOnly?: boolean
223| onInterrupt?: () => void
224| onSetModel?: (model: string | undefined) => void
225| onSetMaxThinkingTokens?: (maxTokens: number | null) => void
226| onSetPermissionMode?: (
227| mode: PermissionMode,
228| ) => { ok: true } | { ok: false; error: string }
229| }
230|
231| const OUTBOUND_ONLY_ERROR =
232| 'This session is outbound-only. Enable Remote Control locally to allow inbound control.'
233|
234| /**
235| * Respond to inbound control_request messages from the server. The server
236| * sends these for session lifecycle events (initialize, set_model) and
237| * for turn-level coordination (interrupt, set_max_thinking_tokens). If we
238| * don't respond, the server hangs and kills the WS after ~10-14s.
239| *
240| * Previously a closure inside initBridgeCore's onWorkReceived; now takes
241| * collaborators as params so both cores can use it.
242| */
243| export function handleServerControlRequest(
244| request: SDKControlRequest,
245| handlers: ServerControlRequestHandlers,
246| ): void {
247| const {
248| transport,
249| sessionId,
250| outboundOnly,
251| onInterrupt,
252| onSetModel,
253| onSetMaxThinkingTokens,
254| onSetPermissionMode,
255| } = handlers
256| if (!transport) {
257| logForDebugging(
258| '[bridge:repl] Cannot respond to control_request: transport not configured',
259| )
260| return
261| }
262|
263| let response: SDKControlResponse
264|
265| // Outbound-only: reply error for mutable requests so claude.ai doesn't show
266| // false success. initialize must still succeed (server kills the connection
267| // if it doesn't — see comment above).
268| if (outboundOnly && request.request.subtype !== 'initialize') {
269| response = {
270| type: 'control_response',
271| response: {
272| subtype: 'error',
273| request_id: request.request_id,
274| error: OUTBOUND_ONLY_ERROR,
275| },
276| }
277| const event = { ...response, session_id: sessionId }
278| void transport.write(event)
279| logForDebugging(
280| `[bridge:repl] Rejected ${request.request.subtype} (outbound-only) request_id=${request.request_id}`,
281| )
282| return
283| }
源码引用: src/bridge/bridgeMessaging.ts · 第 243–350 行(共 462 行)
243| export function handleServerControlRequest(
244| request: SDKControlRequest,
245| handlers: ServerControlRequestHandlers,
246| ): void {
247| const {
248| transport,
249| sessionId,
250| outboundOnly,
251| onInterrupt,
252| onSetModel,
253| onSetMaxThinkingTokens,
254| onSetPermissionMode,
255| } = handlers
256| if (!transport) {
257| logForDebugging(
258| '[bridge:repl] Cannot respond to control_request: transport not configured',
259| )
260| return
261| }
262|
263| let response: SDKControlResponse
264|
265| // Outbound-only: reply error for mutable requests so claude.ai doesn't show
266| // false success. initialize must still succeed (server kills the connection
267| // if it doesn't — see comment above).
268| if (outboundOnly && request.request.subtype !== 'initialize') {
269| response = {
270| type: 'control_response',
271| response: {
272| subtype: 'error',
273| request_id: request.request_id,
274| error: OUTBOUND_ONLY_ERROR,
275| },
276| }
277| const event = { ...response, session_id: sessionId }
278| void transport.write(event)
279| logForDebugging(
280| `[bridge:repl] Rejected ${request.request.subtype} (outbound-only) request_id=${request.request_id}`,
281| )
282| return
283| }
284|
285| switch (request.request.subtype) {
286| case 'initialize':
287| // Respond with minimal capabilities — the REPL handles
288| // commands, models, and account info itself.
289| response = {
290| type: 'control_response',
291| response: {
292| subtype: 'success',
293| request_id: request.request_id,
294| response: {
295| commands: [],
296| output_style: 'normal',
297| available_output_styles: ['normal'],
298| models: [],
299| account: {},
300| pid: process.pid,
301| },
302| },
303| }
304| break
305|
306| case 'set_model':
307| onSetModel?.(request.request.model)
308| response = {
309| type: 'control_response',
310| response: {
311| subtype: 'success',
312| request_id: request.request_id,
313| },
314| }
315| break
316|
317| case 'set_max_thinking_tokens':
318| onSetMaxThinkingTokens?.(request.request.max_thinking_tokens)
319| response = {
320| type: 'control_response',
321| response: {
322| subtype: 'success',
323| request_id: request.request_id,
324| },
325| }
326| break
327|
328| case 'set_permission_mode': {
329| // The callback returns a policy verdict so we can send an error
330| // control_response without importing isAutoModeGateEnabled /
331| // isBypassPermissionsModeDisabled here (bootstrap-isolation). If no
332| // callback is registered (daemon context, which doesn't wire this —
333| // see daemonBridge.ts), return an error verdict rather than a silent
334| // false-success: the mode is never actually applied in that context,
335| // so success would lie to the client.
336| const verdict = onSetPermissionMode?.(request.request.mode) ?? {
337| ok: false,
338| error:
339| 'set_permission_mode is not supported in this context (onSetPermissionMode callback not registered)',
340| }
341| if (verdict.ok) {
342| response = {
343| type: 'control_response',
344| response: {
345| subtype: 'success',
346| request_id: request.request_id,
347| },
348| }
349| } else {
350| response = {
inboundMessages 字段提取
extractInboundMessageFields 是 REPL 入队前的窄接口:
- 非
type === 'user'返回 undefined - 空 content 或空数组跳过
- 提取 uuid 供去重与 session 关联
- 数组 content 经 normalizeImageBlocks
normalizeImageBlocks 扫描 isMalformedBase64Image(有 data 但无 media_type)。修复策略:优先客户端 mediaType camelCase,否则 detectImageFormatFromBase64。快路径:无需修复时返回原数组引用零分配。
背景:mobile-apps#5825——错误块会导致后续每次 API 调用 "media_type: Field required",整会话报废。
源码引用: src/bridge/inboundMessages.ts · 第 10–40 行(共 81 行)
10| /**
11| * Process an inbound user message from the bridge, extracting content
12| * and UUID for enqueueing. Supports both string content and
13| * ContentBlockParam[] (e.g. messages containing images).
14| *
15| * Normalizes image blocks from bridge clients that may use camelCase
16| * `mediaType` instead of snake_case `media_type` (mobile-apps#5825).
17| *
18| * Returns the extracted fields, or undefined if the message should be
19| * skipped (non-user type, missing/empty content).
20| */
21| export function extractInboundMessageFields(
22| msg: SDKMessage,
23| ):
24| | { content: string | Array<ContentBlockParam>; uuid: UUID | undefined }
25| | undefined {
26| if (msg.type !== 'user') return undefined
27| const content = msg.message?.content
28| if (!content) return undefined
29| if (Array.isArray(content) && content.length === 0) return undefined
30|
31| const uuid =
32| 'uuid' in msg && typeof msg.uuid === 'string'
33| ? (msg.uuid as UUID)
34| : undefined
35|
36| return {
37| content: Array.isArray(content) ? normalizeImageBlocks(content) : content,
38| uuid,
39| }
40| }
源码引用: src/bridge/inboundMessages.ts · 第 42–80 行(共 81 行)
42| /**
43| * Normalize image content blocks from bridge clients. iOS/web clients may
44| * send `mediaType` (camelCase) instead of `media_type` (snake_case), or
45| * omit the field entirely. Without normalization, the bad block poisons
46| * the session — every subsequent API call fails with
47| * "media_type: Field required".
48| *
49| * Fast-path scan returns the original array reference when no
50| * normalization is needed (zero allocation on the happy path).
51| */
52| export function normalizeImageBlocks(
53| blocks: Array<ContentBlockParam>,
54| ): Array<ContentBlockParam> {
55| if (!blocks.some(isMalformedBase64Image)) return blocks
56|
57| return blocks.map(block => {
58| if (!isMalformedBase64Image(block)) return block
59| const src = block.source as unknown as Record<string, unknown>
60| const mediaType =
61| typeof src.mediaType === 'string' && src.mediaType
62| ? src.mediaType
63| : detectImageFormatFromBase64(block.source.data)
64| return {
65| ...block,
66| source: {
67| type: 'base64' as const,
68| media_type: mediaType as Base64ImageSource['media_type'],
69| data: block.source.data,
70| },
71| }
72| })
73| }
74|
75| function isMalformedBase64Image(
76| block: ContentBlockParam,
77| ): block is ImageBlockParam & { source: Base64ImageSource } {
78| if (block.type !== 'image' || block.source?.type !== 'base64') return false
79| return !(block.source as unknown as Record<string, unknown>).media_type
80| }
inboundAttachments 文件拉取
Web composer 上传文件到 /api/{org}/upload,消息体带 **file_attachments: [{ file_uuid, file_name }]`。
extractInboundAttachments 用 zod lazySchema 安全解析未知形状。
resolveInboundAttachments(文件后半)对每个附件:
- OAuth GET
/api/oauth/files/{uuid}/content - 写入
~/.claude/uploads/{sessionId}/{uuid8}_{safeName} - 返回 @path 字符串 prepend 到 user content
sanitizeFileName 去路径成分,仅保留安全字符。best-effort:无 token、非 200、磁盘错误均 debug 跳过,消息仍入队。
getBridgeBaseUrl 在 try 内调用,FedStart 自定义 OAuth URL 非白名单时降级为无附件而非崩溃 print.ts reader loop。
源码引用: src/bridge/inboundAttachments.ts · 第 1–48 行(共 176 行)
1| /**
2| * Resolve file_uuid attachments on inbound bridge user messages.
3| *
4| * Web composer uploads via cookie-authed /api/{org}/upload, sends file_uuid
5| * alongside the message. Here we fetch each via GET /api/oauth/files/{uuid}/content
6| * (oauth-authed, same store), write to ~/.claude/uploads/{sessionId}/, and
7| * return @path refs to prepend. Claude's Read tool takes it from there.
8| *
9| * Best-effort: any failure (no token, network, non-2xx, disk) logs debug and
10| * skips that attachment. The message still reaches Claude, just without @path.
11| */
12|
13| import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
14| import axios from 'axios'
15| import { randomUUID } from 'crypto'
16| import { mkdir, writeFile } from 'fs/promises'
17| import { basename, join } from 'path'
18| import { z } from 'zod/v4'
19| import { getSessionId } from '../bootstrap/state.js'
20| import { logForDebugging } from '../utils/debug.js'
21| import { getClaudeConfigHomeDir } from '../utils/envUtils.js'
22| import { lazySchema } from '../utils/lazySchema.js'
23| import { getBridgeAccessToken, getBridgeBaseUrl } from './bridgeConfig.js'
24|
25| const DOWNLOAD_TIMEOUT_MS = 30_000
26|
27| function debug(msg: string): void {
28| logForDebugging(`[bridge:inbound-attach] ${msg}`)
29| }
30|
31| const attachmentSchema = lazySchema(() =>
32| z.object({
33| file_uuid: z.string(),
34| file_name: z.string(),
35| }),
36| )
37| const attachmentsArraySchema = lazySchema(() => z.array(attachmentSchema()))
38|
39| export type InboundAttachment = z.infer<ReturnType<typeof attachmentSchema>>
40|
41| /** Pull file_attachments off a loosely-typed inbound message. */
42| export function extractInboundAttachments(msg: unknown): InboundAttachment[] {
43| if (typeof msg !== 'object' || msg === null || !('file_attachments' in msg)) {
44| return []
45| }
46| const parsed = attachmentsArraySchema().safeParse(msg.file_attachments)
47| return parsed.success ? parsed.data : []
48| }
源码引用: src/bridge/inboundAttachments.ts · 第 50–100 行(共 176 行)
50| /**
51| * Strip path components and keep only filename-safe chars. file_name comes
52| * from the network (web composer), so treat it as untrusted even though the
53| * composer controls it.
54| */
55| function sanitizeFileName(name: string): string {
56| const base = basename(name).replace(/[^a-zA-Z0-9._-]/g, '_')
57| return base || 'attachment'
58| }
59|
60| function uploadsDir(): string {
61| return join(getClaudeConfigHomeDir(), 'uploads', getSessionId())
62| }
63|
64| /**
65| * Fetch + write one attachment. Returns the absolute path on success,
66| * undefined on any failure.
67| */
68| async function resolveOne(att: InboundAttachment): Promise<string | undefined> {
69| const token = getBridgeAccessToken()
70| if (!token) {
71| debug('skip: no oauth token')
72| return undefined
73| }
74|
75| let data: Buffer
76| try {
77| // getOauthConfig() (via getBridgeBaseUrl) throws on a non-allowlisted
78| // CLAUDE_CODE_CUSTOM_OAUTH_URL — keep it inside the try so a bad
79| // FedStart URL degrades to "no @path" instead of crashing print.ts's
80| // reader loop (which has no catch around the await).
81| const url = `${getBridgeBaseUrl()}/api/oauth/files/${encodeURIComponent(att.file_uuid)}/content`
82| const response = await axios.get(url, {
83| headers: { Authorization: `Bearer ${token}` },
84| responseType: 'arraybuffer',
85| timeout: DOWNLOAD_TIMEOUT_MS,
86| validateStatus: () => true,
87| })
88| if (response.status !== 200) {
89| debug(`fetch ${att.file_uuid} failed: status=${response.status}`)
90| return undefined
91| }
92| data = Buffer.from(response.data)
93| } catch (e) {
94| debug(`fetch ${att.file_uuid} threw: ${e}`)
95| return undefined
96| }
97|
98| // uuid-prefix makes collisions impossible across messages and within one
99| // (same filename, different files). 8 chars is enough — this isn't security.
100| const safeName = sanitizeFileName(att.file_name)
FlushGate 初始 flush 排队
会话启动时历史消息通过单次 HTTP 批量上传;若同时允许 live write,会出现 server 侧交错乱序。
FlushGate<T> 状态机:
| 方法 | 效果 |
|---|---|
start() | active=true,enqueue 开始缓冲 |
enqueue(...items) | active 时入队并返回 true;否则 false 由调用方直接发送 |
end() | active=false,返回待 drain 队列 |
drop() | 永久关闭,丢弃队列(transport 死掉) |
deactivate() | 仅清 active,保留队列(transport 替换,新 transport 负责 drain) |
replBridge 在 initial flush 前后包裹 writeMessages;remoteBridgeCore 复用同一类。pendingCount 可用于诊断「flush 卡住」类 bug。
源码引用: src/bridge/flushGate.ts · 第 1–71 行(共 72 行)
1| /**
2| * State machine for gating message writes during an initial flush.
3| *
4| * When a bridge session starts, historical messages are flushed to the
5| * server via a single HTTP POST. During that flush, new messages must
6| * be queued to prevent them from arriving at the server interleaved
7| * with the historical messages.
8| *
9| * Lifecycle:
10| * start() → enqueue() returns true, items are queued
11| * end() → returns queued items for draining, enqueue() returns false
12| * drop() → discards queued items (permanent transport close)
13| * deactivate() → clears active flag without dropping items
14| * (transport replacement — new transport will drain)
15| */
16| export class FlushGate<T> {
17| private _active = false
18| private _pending: T[] = []
19|
20| get active(): boolean {
21| return this._active
22| }
23|
24| get pendingCount(): number {
25| return this._pending.length
26| }
27|
28| /** Mark flush as in-progress. enqueue() will start queuing items. */
29| start(): void {
30| this._active = true
31| }
32|
33| /**
34| * End the flush and return any queued items for draining.
35| * Caller is responsible for sending the returned items.
36| */
37| end(): T[] {
38| this._active = false
39| return this._pending.splice(0)
40| }
41|
42| /**
43| * If flush is active, queue the items and return true.
44| * If flush is not active, return false (caller should send directly).
45| */
46| enqueue(...items: T[]): boolean {
47| if (!this._active) return false
48| this._pending.push(...items)
49| return true
50| }
51|
52| /**
53| * Discard all queued items (permanent transport close).
54| * Returns the number of items dropped.
55| */
56| drop(): number {
57| this._active = false
58| const count = this._pending.length
59| this._pending.length = 0
60| return count
61| }
62|
63| /**
64| * Clear the active flag without dropping queued items.
65| * Used when the transport is replaced (onWorkReceived) — the new
66| * transport's flush will drain the pending items.
67| */
68| deactivate(): void {
69| this._active = false
70| }
71| }
与 replBridge write 路径协作
出站 writeMessages 典型顺序(initBridgeCore 内,概念层):
- 对每条 Message 检查 isEligibleBridgeMessage
- extractTitleText 触发 onUserMessage(未 done 时)
- toSDKMessages 转换
- 若 flushGate.enqueue 返回 true,跳过即时 transport write
- flush 结束后 end() drain 队列 + 新消息直写
- 发出前将 uuid 加入 recentPostedUUIDs
writeSdkMessages 跳过 Message 层过滤,供 daemon 已构造好的 SDK 流。
入站侧 useReplBridge 注册的 onInboundMessage 通常:extractInboundMessageFields → resolveInboundAttachments → 合并 @path 文本 → push 到 REPL input queue。
调试「附件不见」:查 [bridge:inbound-attach] debug 与 OAuth 文件 API status;调试「flush 后重复」:查 previouslyFlushedUUIDs 是否在 initReplBridge 传入并变异。
源码引用: src/bridge/bridgeMessaging.ts · 第 1–11 行(共 462 行)
1| /**
2| * Shared transport-layer helpers for bridge message handling.
3| *
4| * Extracted from replBridge.ts so both the env-based core (initBridgeCore)
5| * and the env-less core (initEnvLessBridgeCore) can use the same ingress
6| * parsing, control-request handling, and echo-dedup machinery.
7| *
8| * Everything here is pure — no closure over bridge-specific state. All
9| * collaborators (transport, sessionId, UUID sets, callbacks) are passed
10| * as params.
11| */
本章小结与延伸
bridge-messaging = 双向消息的语义防火墙。下一章 remote-bridge-core,读 env-less 初始化与 bridgeMain 多 session 循环。 继续学习: