本章总览
本章覆盖 cli/update.ts(claude update 完整流程)与 transports 层的两个上传原语:WorkerStateUploader(PUT /worker 状态合并)和 SerialBatchEventUploader(串行批量 POST 事件)。transportUtils.getTransportForUrl 决定 remote worker 用哪条网络栈,与 update 无直接耦合,但同属 cli 模块运维面。update 依赖 getDoctorDiagnostic 识别安装类型,分支 native / npm-local / npm-global / package-manager;上传原语则被 HybridTransport 与 CCRClient 共用,是 remote session 可靠性的基础。
学完本章你应该能
- 描述 update() 从 diagnostic 到 install 的分支决策树
- 解释 config installMethod 与 reality mismatch 的自动修正
- 说明 WorkerStateUploader 的 coalescePatches RFC 7396 语义
- 掌握 SerialBatchEventUploader 的 backpressure、flush 与 RetryableError
- 关联 transportUtils 选型与 CCR v2 worker 生命周期
核心概念(先读懂这些)
update 是诊断驱动而非盲目 npm install
update() 首行 logEvent tengu_update_check,然后 getDoctorDiagnostic():installationType、multipleInstallations、warnings、configInstallMethod。development 构建直接拒绝;package-manager(Homebrew/winget/apk)只打印对应升级命令;native 走 installLatestNative;其余才 npm local/global。避免在 Homebrew 安装上误跑 npm install -g。
WorkerStateUploader:有界合并 PUT
与 SerialBatchEventUploader 不同,WorkerStateUploader 只允许 1 in-flight + 1 pending patch。enqueue fire-and-forget;新 patch coalesce 进 pending;drain 成功后若仍有 pending 继续。失败指数退避无限重试。metadata 键 external_metadata/internal_metadata 浅合并,null 值保留给 server 删除语义。
SerialBatchEventUploader:事件流生命线
CCRClient.writeEvent 与 HybridTransport 写入都依赖它。maxConsecutiveFailures 可选——replBridge 设置后防止永久 failing server 卡死进程;CCR 默认无限重试。flush() 在 turn 结束或 shutdown 必须 await,否则 internal events 丢失。
建议学习步骤
- 阅读源码块 A:update() 开头与 diagnostic 警告
- 阅读源码块 B:native 与 npm 分支
- 阅读源码块 C:transportUtils 选型
- 阅读源码块 D:WorkerStateUploader coalesce
- 阅读源码块 E:SerialBatchEventUploader enqueue/flush
- 阅读源码块 F:package-manager 分支
常见误区
注意
update 用 gracefulShutdown 而非 process.exit 直接退出——留 hook/analytics 窗口
注意
removeInstalledSymlink 在 non-native config 时执行,防止 native 与 npm 混用
注意
SerialBatchEventUploader close 后 flush 仍 resolve,但 droppedBatchCount 可能 >0
update() 总流程
claude update
→ logEvent tengu_update_check
→ getDoctorDiagnostic()
→ 多安装警告 / warnings 展示
→ 补全 config.installMethod(非 package-manager)
→ development? → 退出 1
→ package-manager? → brew/winget/apk 提示 → 退出 0
→ config vs reality mismatch? → 修正 config
→ native? → installLatestNative(channel) → regenerateCompletionCache
→ else npm: getLatestVersion → local vs global install
→ gracefulShutdown(exitCode)
channel 来自 getInitialSettings().autoUpdatesChannel(latest/stable)。semver 比较用 utils/semver.gte。
源码引用: src/cli/update.ts · 第 30–74 行(共 423 行)
30| export async function update() {
31| logEvent('tengu_update_check', {})
32| writeToStdout(`Current version: ${MACRO.VERSION}\n`)
33|
34| const channel = getInitialSettings()?.autoUpdatesChannel ?? 'latest'
35| writeToStdout(`Checking for updates to ${channel} version...\n`)
36|
37| logForDebugging('update: Starting update check')
38|
39| // Run diagnostic to detect potential issues
40| logForDebugging('update: Running diagnostic')
41| const diagnostic = await getDoctorDiagnostic()
42| logForDebugging(`update: Installation type: ${diagnostic.installationType}`)
43| logForDebugging(
44| `update: Config install method: ${diagnostic.configInstallMethod}`,
45| )
46|
47| // Check for multiple installations
48| if (diagnostic.multipleInstallations.length > 1) {
49| writeToStdout('\n')
50| writeToStdout(chalk.yellow('Warning: Multiple installations found') + '\n')
51| for (const install of diagnostic.multipleInstallations) {
52| const current =
53| diagnostic.installationType === install.type
54| ? ' (currently running)'
55| : ''
56| writeToStdout(`- ${install.type} at ${install.path}${current}\n`)
57| }
58| }
59|
60| // Display warnings if any exist
61| if (diagnostic.warnings.length > 0) {
62| writeToStdout('\n')
63| for (const warning of diagnostic.warnings) {
64| logForDebugging(`update: Warning detected: ${warning.issue}`)
65|
66| // Don't skip PATH warnings - they're always relevant
67| // The user needs to know that 'which claude' points elsewhere
68| logForDebugging(`update: Showing warning: ${warning.issue}`)
69|
70| writeToStdout(chalk.yellow(`Warning: ${warning.issue}\n`))
71|
72| writeToStdout(chalk.bold(`Fix: ${warning.fix}\n`))
73| }
74| }
源码引用: src/cli/update.ts · 第 76–115 行(共 423 行)
76| // Update config if installMethod is not set (but skip for package managers)
77| const config = getGlobalConfig()
78| if (
79| !config.installMethod &&
80| diagnostic.installationType !== 'package-manager'
81| ) {
82| writeToStdout('\n')
83| writeToStdout('Updating configuration to track installation method...\n')
84| let detectedMethod: 'local' | 'native' | 'global' | 'unknown' = 'unknown'
85|
86| // Map diagnostic installation type to config install method
87| switch (diagnostic.installationType) {
88| case 'npm-local':
89| detectedMethod = 'local'
90| break
91| case 'native':
92| detectedMethod = 'native'
93| break
94| case 'npm-global':
95| detectedMethod = 'global'
96| break
97| default:
98| detectedMethod = 'unknown'
99| }
100|
101| saveGlobalConfig(current => ({
102| ...current,
103| installMethod: detectedMethod,
104| }))
105| writeToStdout(`Installation method set to: ${detectedMethod}\n`)
106| }
107|
108| // Check if running from development build
109| if (diagnostic.installationType === 'development') {
110| writeToStdout('\n')
111| writeToStdout(
112| chalk.yellow('Warning: Cannot update development build') + '\n',
113| )
114| await gracefulShutdown(1)
115| }
package-manager 与 native 分支
package-manager 分支识别 getPackageManager():
- homebrew → brew upgrade claude-code
- winget → winget upgrade Anthropic.ClaudeCode
- apk → apk upgrade claude-code
- 其他(pacman/deb/rpm)泛化提示
有新版则打印 Current → Latest,否则 "up to date"。从不自动执行包管理器命令——只 instruct。
native 分支:
- installLatestNative(channel, true)
- lockFailed → 黄字提示其他 PID 占用,exit 0
- 成功且版本变化 → regenerateCompletionCache()
- catch → 建议 claude doctor
native 路径在 npm 逻辑之前 return,避免双轨。
源码引用: src/cli/update.ts · 第 117–166 行(共 423 行)
117| // Check if running from a package manager
118| if (diagnostic.installationType === 'package-manager') {
119| const packageManager = await getPackageManager()
120| writeToStdout('\n')
121|
122| if (packageManager === 'homebrew') {
123| writeToStdout('Claude is managed by Homebrew.\n')
124| const latest = await getLatestVersion(channel)
125| if (latest && !gte(MACRO.VERSION, latest)) {
126| writeToStdout(`Update available: ${MACRO.VERSION} → ${latest}\n`)
127| writeToStdout('\n')
128| writeToStdout('To update, run:\n')
129| writeToStdout(chalk.bold(' brew upgrade claude-code') + '\n')
130| } else {
131| writeToStdout('Claude is up to date!\n')
132| }
133| } else if (packageManager === 'winget') {
134| writeToStdout('Claude is managed by winget.\n')
135| const latest = await getLatestVersion(channel)
136| if (latest && !gte(MACRO.VERSION, latest)) {
137| writeToStdout(`Update available: ${MACRO.VERSION} → ${latest}\n`)
138| writeToStdout('\n')
139| writeToStdout('To update, run:\n')
140| writeToStdout(
141| chalk.bold(' winget upgrade Anthropic.ClaudeCode') + '\n',
142| )
143| } else {
144| writeToStdout('Claude is up to date!\n')
145| }
146| } else if (packageManager === 'apk') {
147| writeToStdout('Claude is managed by apk.\n')
148| const latest = await getLatestVersion(channel)
149| if (latest && !gte(MACRO.VERSION, latest)) {
150| writeToStdout(`Update available: ${MACRO.VERSION} → ${latest}\n`)
151| writeToStdout('\n')
152| writeToStdout('To update, run:\n')
153| writeToStdout(chalk.bold(' apk upgrade claude-code') + '\n')
154| } else {
155| writeToStdout('Claude is up to date!\n')
156| }
157| } else {
158| // pacman, deb, and rpm don't get specific commands because they each have
159| // multiple frontends (pacman: yay/paru/makepkg, deb: apt/apt-get/aptitude/nala,
160| // rpm: dnf/yum/zypper)
161| writeToStdout('Claude is managed by a package manager.\n')
162| writeToStdout('Please use your package manager to update.\n')
163| }
164|
165| await gracefulShutdown(0)
166| }
源码引用: src/cli/update.ts · 第 213–258 行(共 423 行)
213| // Handle native installation updates first
214| if (diagnostic.installationType === 'native') {
215| logForDebugging(
216| 'update: Detected native installation, using native updater',
217| )
218| try {
219| const result = await installLatestNative(channel, true)
220|
221| // Handle lock contention gracefully
222| if (result.lockFailed) {
223| const pidInfo = result.lockHolderPid
224| ? ` (PID ${result.lockHolderPid})`
225| : ''
226| writeToStdout(
227| chalk.yellow(
228| `Another Claude process${pidInfo} is currently running. Please try again in a moment.`,
229| ) + '\n',
230| )
231| await gracefulShutdown(0)
232| }
233|
234| if (!result.latestVersion) {
235| process.stderr.write('Failed to check for updates\n')
236| await gracefulShutdown(1)
237| }
238|
239| if (result.latestVersion === MACRO.VERSION) {
240| writeToStdout(
241| chalk.green(`Claude Code is up to date (${MACRO.VERSION})`) + '\n',
242| )
243| } else {
244| writeToStdout(
245| chalk.green(
246| `Successfully updated from ${MACRO.VERSION} to version ${result.latestVersion}`,
247| ) + '\n',
248| )
249| await regenerateCompletionCache()
250| }
251| await gracefulShutdown(0)
252| } catch (error) {
253| process.stderr.write('Error: Failed to install native update\n')
254| process.stderr.write(String(error) + '\n')
255| process.stderr.write('Try running "claude doctor" for diagnostics\n')
256| await gracefulShutdown(1)
257| }
258| }
npm local/global 与 installMethod 修正
非 native 路径:
- config.installMethod !== native 时 removeInstalledSymlink()
- getLatestVersion(channel) 失败 → 详细 stderr(网络/npm/内部包)→ exit 1
- latestVersion === MACRO.VERSION → green up to date
- 按 diagnostic.installationType 选 installOrUpdateClaudePackage vs installGlobalPackage
- InstallStatus:success / no_permissions / install_failed / in_progress
config mismatch 段:normalizedRunningType !== configExpects 时警告并 saveGlobalConfig 对齐 reality(npm-local→local 等)。
multipleInstallations 警告帮助用户理解 PATH 上 which claude 与正在运行实例不一致。
源码引用: src/cli/update.ts · 第 168–211 行(共 423 行)
168| // Check for config/reality mismatch (skip for package-manager installs)
169| if (
170| config.installMethod &&
171| diagnostic.configInstallMethod !== 'not set' &&
172| diagnostic.installationType !== 'package-manager'
173| ) {
174| const runningType = diagnostic.installationType
175| const configExpects = diagnostic.configInstallMethod
176|
177| // Map installation types for comparison
178| const typeMapping: Record<string, string> = {
179| 'npm-local': 'local',
180| 'npm-global': 'global',
181| native: 'native',
182| development: 'development',
183| unknown: 'unknown',
184| }
185|
186| const normalizedRunningType = typeMapping[runningType] || runningType
187|
188| if (
189| normalizedRunningType !== configExpects &&
190| configExpects !== 'unknown'
191| ) {
192| writeToStdout('\n')
193| writeToStdout(chalk.yellow('Warning: Configuration mismatch') + '\n')
194| writeToStdout(`Config expects: ${configExpects} installation\n`)
195| writeToStdout(`Currently running: ${runningType}\n`)
196| writeToStdout(
197| chalk.yellow(
198| `Updating the ${runningType} installation you are currently using`,
199| ) + '\n',
200| )
201|
202| // Update config to match reality
203| saveGlobalConfig(current => ({
204| ...current,
205| installMethod: normalizedRunningType as InstallMethod,
206| }))
207| writeToStdout(
208| `Config updated to reflect current installation method: ${normalizedRunningType}\n`,
209| )
210| }
211| }
源码引用: src/cli/update.ts · 第 260–314 行(共 423 行)
260| // Fallback to existing JS/npm-based update logic
261| // Remove native installer symlink since we're not using native installation
262| // But only if user hasn't migrated to native installation
263| if (config.installMethod !== 'native') {
264| await removeInstalledSymlink()
265| }
266|
267| logForDebugging('update: Checking npm registry for latest version')
268| logForDebugging(`update: Package URL: ${MACRO.PACKAGE_URL}`)
269| const npmTag = channel === 'stable' ? 'stable' : 'latest'
270| const npmCommand = `npm view ${MACRO.PACKAGE_URL}@${npmTag} version`
271| logForDebugging(`update: Running: ${npmCommand}`)
272| const latestVersion = await getLatestVersion(channel)
273| logForDebugging(
274| `update: Latest version from npm: ${latestVersion || 'FAILED'}`,
275| )
276|
277| if (!latestVersion) {
278| logForDebugging('update: Failed to get latest version from npm registry')
279| process.stderr.write(chalk.red('Failed to check for updates') + '\n')
280| process.stderr.write('Unable to fetch latest version from npm registry\n')
281| process.stderr.write('\n')
282| process.stderr.write('Possible causes:\n')
283| process.stderr.write(' • Network connectivity issues\n')
284| process.stderr.write(' • npm registry is unreachable\n')
285| process.stderr.write(' • Corporate proxy/firewall blocking npm\n')
286| if (MACRO.PACKAGE_URL && !MACRO.PACKAGE_URL.startsWith('@anthropic')) {
287| process.stderr.write(
288| ' • Internal/development build not published to npm\n',
289| )
290| }
291| process.stderr.write('\n')
292| process.stderr.write('Try:\n')
293| process.stderr.write(' • Check your internet connection\n')
294| process.stderr.write(' • Run with --debug flag for more details\n')
295| const packageName =
296| MACRO.PACKAGE_URL ||
297| (process.env.USER_TYPE === 'ant'
298| ? '@anthropic-ai/claude-cli'
299| : '@anthropic-ai/claude-code')
300| process.stderr.write(
301| ` • Manually check: npm view ${packageName} version\n`,
302| )
303|
304| process.stderr.write(' • Check if you need to login: npm whoami\n')
305| await gracefulShutdown(1)
306| }
307|
308| // Check if versions match exactly, including any build metadata (like SHA)
309| if (latestVersion === MACRO.VERSION) {
310| writeToStdout(
311| chalk.green(`Claude Code is up to date (${MACRO.VERSION})`) + '\n',
312| )
313| await gracefulShutdown(0)
314| }
源码引用: src/cli/update.ts · 第 321–421 行(共 423 行)
321| // Determine update method based on what's actually running
322| let useLocalUpdate = false
323| let updateMethodName = ''
324|
325| switch (diagnostic.installationType) {
326| case 'npm-local':
327| useLocalUpdate = true
328| updateMethodName = 'local'
329| break
330| case 'npm-global':
331| useLocalUpdate = false
332| updateMethodName = 'global'
333| break
334| case 'unknown': {
335| // Fallback to detection if we can't determine installation type
336| const isLocal = await localInstallationExists()
337| useLocalUpdate = isLocal
338| updateMethodName = isLocal ? 'local' : 'global'
339| writeToStdout(
340| chalk.yellow('Warning: Could not determine installation type') + '\n',
341| )
342| writeToStdout(
343| `Attempting ${updateMethodName} update based on file detection...\n`,
344| )
345| break
346| }
347| default:
348| process.stderr.write(
349| `Error: Cannot update ${diagnostic.installationType} installation\n`,
350| )
351| await gracefulShutdown(1)
352| }
353|
354| writeToStdout(`Using ${updateMethodName} installation update method...\n`)
355|
356| logForDebugging(`update: Update method determined: ${updateMethodName}`)
357| logForDebugging(`update: useLocalUpdate: ${useLocalUpdate}`)
358|
359| let status: InstallStatus
360|
361| if (useLocalUpdate) {
362| logForDebugging(
363| 'update: Calling installOrUpdateClaudePackage() for local update',
364| )
365| status = await installOrUpdateClaudePackage(channel)
366| } else {
367| logForDebugging('update: Calling installGlobalPackage() for global update')
368| status = await installGlobalPackage()
369| }
370|
371| logForDebugging(`update: Installation status: ${status}`)
372|
373| switch (status) {
374| case 'success':
375| writeToStdout(
376| chalk.green(
377| `Successfully updated from ${MACRO.VERSION} to version ${latestVersion}`,
378| ) + '\n',
379| )
380| await regenerateCompletionCache()
381| break
382| case 'no_permissions':
383| process.stderr.write(
384| 'Error: Insufficient permissions to install update\n',
385| )
386| if (useLocalUpdate) {
387| process.stderr.write('Try manually updating with:\n')
388| process.stderr.write(
389| ` cd ~/.claude/local && npm update ${MACRO.PACKAGE_URL}\n`,
390| )
391| } else {
392| process.stderr.write('Try running with sudo or fix npm permissions\n')
393| process.stderr.write(
394| 'Or consider using native installation with: claude install\n',
395| )
396| }
397| await gracefulShutdown(1)
398| break
399| case 'install_failed':
400| process.stderr.write('Error: Failed to install update\n')
401| if (useLocalUpdate) {
402| process.stderr.write('Try manually updating with:\n')
403| process.stderr.write(
404| ` cd ~/.claude/local && npm update ${MACRO.PACKAGE_URL}\n`,
405| )
406| } else {
407| process.stderr.write(
408| 'Or consider using native installation with: claude install\n',
409| )
410| }
411| await gracefulShutdown(1)
412| break
413| case 'in_progress':
414| process.stderr.write(
415| 'Error: Another instance is currently performing an update\n',
416| )
417| process.stderr.write('Please wait and try again later\n')
418| await gracefulShutdown(1)
419| break
420| }
421| await gracefulShutdown(0)
transportUtils:与 update 并列的运维入口
getTransportForUrl 虽不在 update 路径上,但是 remote worker 联网方式的总开关:
| 优先级 | 条件 | 实现 |
|---|---|---|
| 1 | CLAUDE_CODE_USE_CCR_V2 | SSETransport + /worker/events/stream |
| 2 | ws + POST_FOR_SESSION_INGRESS_V2 | HybridTransport |
| 3 | ws default | WebSocketTransport |
CCR v2 把 wss→https 并改写 pathname;sessionId 与 refreshHeaders 传入各 transport 供重连鉴权。
运维同学排查「worker 连不上」应同时查 update 版本与 transport env,而非只查 CLI 版本。
源码引用: src/cli/transports/transportUtils.ts · 第 8–45 行(共 46 行)
8| /**
9| * Helper function to get the appropriate transport for a URL.
10| *
11| * Transport selection priority:
12| * 1. SSETransport (SSE reads + POST writes) when CLAUDE_CODE_USE_CCR_V2 is set
13| * 2. HybridTransport (WS reads + POST writes) when CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2 is set
14| * 3. WebSocketTransport (WS reads + WS writes) — default
15| */
16| export function getTransportForUrl(
17| url: URL,
18| headers: Record<string, string> = {},
19| sessionId?: string,
20| refreshHeaders?: () => Record<string, string>,
21| ): Transport {
22| if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
23| // v2: SSE for reads, HTTP POST for writes
24| // --sdk-url is the session URL (.../sessions/{id});
25| // derive the SSE stream URL by appending /worker/events/stream
26| const sseUrl = new URL(url.href)
27| if (sseUrl.protocol === 'wss:') {
28| sseUrl.protocol = 'https:'
29| } else if (sseUrl.protocol === 'ws:') {
30| sseUrl.protocol = 'http:'
31| }
32| sseUrl.pathname =
33| sseUrl.pathname.replace(/\/$/, '') + '/worker/events/stream'
34| return new SSETransport(sseUrl, headers, sessionId, refreshHeaders)
35| }
36|
37| if (url.protocol === 'ws:' || url.protocol === 'wss:') {
38| if (isEnvTruthy(process.env.CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2)) {
39| return new HybridTransport(url, headers, sessionId, refreshHeaders)
40| }
41| return new WebSocketTransport(url, headers, sessionId, refreshHeaders)
42| } else {
43| throw new Error(`Unsupported protocol: ${url.protocol}`)
44| }
45| }
WorkerStateUploader 实现细节
CCRClient 构造 WorkerStateUploader,config.send 执行 PUT /worker HTTP:
enqueue(patch):
- closed 则 no-op
- pending = coalescePatches(pending, patch)
- void drain()
drain:
- 若 inflight 或 !pending 则 return
- payload = pending; pending = null
- sendWithRetry 直到 ok 或 closed
- 重试等待期间 absorb 新 pending 进 current
coalescePatches 对 external_metadata/internal_metadata 对象做一层 RFC 7396 merge;其他 top-level key last wins。
注释强调:No backpressure needed — naturally bounded at 2 slots。适合高频 state/metadata 上报而不爆内存。
源码引用: src/cli/transports/WorkerStateUploader.ts · 第 29–86 行(共 132 行)
29| export class WorkerStateUploader {
30| private inflight: Promise<void> | null = null
31| private pending: Record<string, unknown> | null = null
32| private closed = false
33| private readonly config: WorkerStateUploaderConfig
34|
35| constructor(config: WorkerStateUploaderConfig) {
36| this.config = config
37| }
38|
39| /**
40| * Enqueue a patch to PUT /worker. Coalesces with any existing pending
41| * patch. Fire-and-forget — callers don't need to await.
42| */
43| enqueue(patch: Record<string, unknown>): void {
44| if (this.closed) return
45| this.pending = this.pending ? coalescePatches(this.pending, patch) : patch
46| void this.drain()
47| }
48|
49| close(): void {
50| this.closed = true
51| this.pending = null
52| }
53|
54| private async drain(): Promise<void> {
55| if (this.inflight || this.closed) return
56| if (!this.pending) return
57|
58| const payload = this.pending
59| this.pending = null
60|
61| this.inflight = this.sendWithRetry(payload).then(() => {
62| this.inflight = null
63| if (this.pending && !this.closed) {
64| void this.drain()
65| }
66| })
67| }
68|
69| /** Retries indefinitely with exponential backoff until success or close(). */
70| private async sendWithRetry(payload: Record<string, unknown>): Promise<void> {
71| let current = payload
72| let failures = 0
73| while (!this.closed) {
74| const ok = await this.config.send(current)
75| if (ok) return
76|
77| failures++
78| await sleep(this.retryDelay(failures))
79|
80| // Absorb any patches that arrived during the retry
81| if (this.pending && !this.closed) {
82| current = coalescePatches(current, this.pending)
83| this.pending = null
84| }
85| }
86| }
源码引用: src/cli/transports/WorkerStateUploader.ts · 第 98–125 行(共 132 行)
98| /**
99| * Coalesce two patches for PUT /worker.
100| *
101| * Top-level keys: overlay replaces base (last value wins).
102| * Metadata keys (external_metadata, internal_metadata): RFC 7396 merge
103| * one level deep — overlay keys are added/overwritten, null values
104| * preserved for server-side delete.
105| */
106| function coalescePatches(
107| base: Record<string, unknown>,
108| overlay: Record<string, unknown>,
109| ): Record<string, unknown> {
110| const merged = { ...base }
111|
112| for (const [key, value] of Object.entries(overlay)) {
113| if (
114| (key === 'external_metadata' || key === 'internal_metadata') &&
115| merged[key] &&
116| typeof merged[key] === 'object' &&
117| typeof value === 'object' &&
118| value !== null
119| ) {
120| // RFC 7396 merge — overlay keys win, nulls preserved for server
121| merged[key] = {
122| ...(merged[key] as Record<string, unknown>),
123| ...(value as Record<string, unknown>),
124| }
125| } else {
SerialBatchEventUploader:队列、重试与 flush
核心 API:
- enqueue(events) — 背压:pending + items.length > maxQueueSize 时 await
- flush() — pending 空且 !draining 则立即 resolve;否则等 drain 清空
- close() — 丢弃 pending,唤醒所有 backpressure/flush waiter
- droppedBatchCount — maxConsecutiveFailures 触发 drop 计数
drain 循环:
- takeBatch() 受 maxBatchSize / maxBatchBytes 约束
- send 失败 → batch concat 回 pending 前端 → sleep(retryDelay)
- RetryableError.retryAfterMs 参与 delay 计算
RemoteIO.flushInternalEvents 委托 CCRClient → uploader.flush(),headless turn 边界应调用以防 transcript internal event 滞留。
源码引用: src/cli/transports/SerialBatchEventUploader.ts · 第 64–119 行(共 276 行)
64| export class SerialBatchEventUploader<T> {
65| private pending: T[] = []
66| private pendingAtClose = 0
67| private draining = false
68| private closed = false
69| private backpressureResolvers: Array<() => void> = []
70| private sleepResolve: (() => void) | null = null
71| private flushResolvers: Array<() => void> = []
72| private droppedBatches = 0
73| private readonly config: SerialBatchEventUploaderConfig<T>
74|
75| constructor(config: SerialBatchEventUploaderConfig<T>) {
76| this.config = config
77| }
78|
79| /**
80| * Monotonic count of batches dropped via maxConsecutiveFailures. Callers
81| * can snapshot before flush() and compare after to detect silent drops
82| * (flush() resolves normally even when batches were dropped).
83| */
84| get droppedBatchCount(): number {
85| return this.droppedBatches
86| }
87|
88| /**
89| * Pending queue depth. After close(), returns the count at close time —
90| * close() clears the queue but shutdown diagnostics may read this after.
91| */
92| get pendingCount(): number {
93| return this.closed ? this.pendingAtClose : this.pending.length
94| }
95|
96| /**
97| * Add events to the pending buffer. Returns immediately if space is
98| * available. Blocks (awaits) if the buffer is full — caller pauses
99| * until drain frees space.
100| */
101| async enqueue(events: T | T[]): Promise<void> {
102| if (this.closed) return
103| const items = Array.isArray(events) ? events : [events]
104| if (items.length === 0) return
105|
106| // Backpressure: wait until there's space
107| while (
108| this.pending.length + items.length > this.config.maxQueueSize &&
109| !this.closed
110| ) {
111| await new Promise<void>(resolve => {
112| this.backpressureResolvers.push(resolve)
113| })
114| }
115|
116| if (this.closed) return
117| this.pending.push(...items)
118| void this.drain()
119| }
源码引用: src/cli/transports/SerialBatchEventUploader.ts · 第 125–150 行(共 276 行)
125| flush(): Promise<void> {
126| if (this.pending.length === 0 && !this.draining) {
127| return Promise.resolve()
128| }
129| void this.drain()
130| return new Promise<void>(resolve => {
131| this.flushResolvers.push(resolve)
132| })
133| }
134|
135| /**
136| * Drop pending events and stop processing.
137| * Resolves any blocked enqueue() and flush() callers.
138| */
139| close(): void {
140| if (this.closed) return
141| this.closed = true
142| this.pendingAtClose = this.pending.length
143| this.pending = []
144| this.sleepResolve?.()
145| this.sleepResolve = null
146| for (const resolve of this.backpressureResolvers) resolve()
147| this.backpressureResolvers = []
148| for (const resolve of this.flushResolvers) resolve()
149| this.flushResolvers = []
150| }
源码引用: src/cli/transports/SerialBatchEventUploader.ts · 第 156–193 行(共 276 行)
156| private async drain(): Promise<void> {
157| if (this.draining || this.closed) return
158| this.draining = true
159| let failures = 0
160|
161| try {
162| while (this.pending.length > 0 && !this.closed) {
163| const batch = this.takeBatch()
164| if (batch.length === 0) continue
165|
166| try {
167| await this.config.send(batch)
168| failures = 0
169| } catch (err) {
170| failures++
171| if (
172| this.config.maxConsecutiveFailures !== undefined &&
173| failures >= this.config.maxConsecutiveFailures
174| ) {
175| this.droppedBatches++
176| this.config.onBatchDropped?.(batch.length, failures)
177| failures = 0
178| this.releaseBackpressure()
179| continue
180| }
181| // Re-queue the failed batch at the front. Use concat (single
182| // allocation) instead of unshift(...batch) which shifts every
183| // pending item batch.length times. Only hit on failure path.
184| this.pending = batch.concat(this.pending)
185| const retryAfterMs =
186| err instanceof RetryableError ? err.retryAfterMs : undefined
187| await this.sleep(this.retryDelay(failures, retryAfterMs))
188| continue
189| }
190|
191| // Release backpressure waiters if space opened up
192| this.releaseBackpressure()
193| }
update 依赖的工具链
update.ts import 面展示「CLI 运维」横切依赖:
| 模块 | 用途 |
|---|---|
| utils/autoUpdater.js | getLatestVersion, installGlobalPackage |
| utils/localInstaller.js | installOrUpdateClaudePackage |
| utils/nativeInstaller/ | installLatestNative, getPackageManager |
| utils/doctorDiagnostic.js | getDoctorDiagnostic |
| utils/completionCache.js | regenerateCompletionCache post-update |
| services/analytics | logEvent |
改 update 行为时同步检查 claude doctor 与 claude install 是否共享 diagnostic 逻辑,避免消息不一致。
CCRClient 如何使用两类 Uploader
CCRClient(摘要,详见 cli-transports 章):
- SerialBatchEventUploader — client_event POST、stream_event 合并后 enqueue
- WorkerStateUploader — reportState、reportMetadata、heartbeat 状态 PATCH 合并
initialize 失败 → RemoteIO init.catch → gracefulShutdown(1, other)
理解 update(本地)与 uploader(remote)分工:前者不调用 transport;后者只在 CLAUDE_CODE_USE_CCR_V2 或 Hybrid 写路径激活。
源码引用: src/cli/remoteIO.ts · 第 217–223 行(共 256 行)
217| override flushInternalEvents(): Promise<void> {
218| return this.ccrClient?.flushInternalEvents() ?? Promise.resolve()
219| }
220|
221| override get internalEventsPending(): number {
222| return this.ccrClient?.internalEventsPending ?? 0
223| }
源码目录
点击 transports/WorkerStateUploader.ts 与 SerialBatchEventUploader.ts 跳转本章源码块;transportUtils.ts 跳转选型段。
动手练习
- 运行 claude update --debug,对照 logForDebugging 行理解分支
- 模拟 config.installMethod=global 但实际 npm-local 运行,观察 mismatch 修正
- 阅读 HybridTransport 如何把 stream_event 批进 SerialBatchEventUploader
- 画 WorkerStateUploader 状态机:inflight / pending / closed 转换
本章小结与延伸
update 管本地二进制;Uploader 原语管 remote 数据面。与 cli-transports 章交叉阅读效果最佳。 继续学习: