Skip to Content
Architecture消息流转

消息流转

完整请求生命周期

用户的一次输入到 AI 响应完成,经历以下阶段:

用户输入文本 processUserInput() ← 输入预处理 │ - 解析斜杠命令 (/init, /review, ...) │ - 处理 @文件引用 (@file.ts, @dir/) │ - 附件处理 (图片、PDF) │ - 粘贴图片检测 query() ← 核心查询函数 (src/query.ts) ├──→ 构建 Messages ← context.ts │ - System Prompt (基础指令) │ - CLAUDE.md 内容 (项目级 + 用户级) │ - Git 上下文 (分支、状态、远程) │ - 历史对话 (含压缩摘要) │ - 工具定义列表 ├──→ 调用 API ← services/api/ │ - Anthropic Messages API │ - 流式响应 (SSE) │ - Token 用量追踪 ├──→ 处理响应 │ ├── 文本输出 → 直接渲染到终端 │ ├── thinking 块 → 折叠显示推理过程 │ └── tool_use → 进入工具循环 └──→ 工具循环 ├──→ 匹配工具 (toolMatchesName) ├──→ 验证输入 (Zod Schema) ├──→ 权限检查 (canUseTool) ├── ├── 允许 → 执行 ├── ├── 拒绝 → 返回拒绝消息 ├── └── 需要确认 → 弹出确认对话框 ├──→ 执行工具 (tool.execute) ├──→ 渲染结果 (renderToolResult) ├──→ 返回 tool_result 到消息列表 └──→ 继续 API 调用(直到无 tool_use)

processUserInput 详解

// src/processUserInput.ts(简化) async function processUserInput(raw: string): Promise<ProcessedInput> { // 1. 斜杠命令检查 if (raw.startsWith('/')) { const command = parseSlashCommand(raw) return { type: 'command', command } } // 2. @文件引用处理 const { text, attachments } = await processAtMentions(raw) // 3. 粘贴图片检测 if (hasClipboardImage()) { attachments.push(await saveClipboardImage()) } return { type: 'query', text, attachments } }

@文件引用

@file.ts 语法将文件内容直接注入对话:

// 支持的引用格式 @path/to/file.ts // 单个文件 @src/components/ // 整个目录(递归) @package.json // 项目配置

处理流程:

  1. 解析 @ 标记
  2. 读取文件/目录内容
  3. 添加为 text 类型的 ContentBlock
  4. 保留原始 @ 引用供 AI 理解上下文

query() 核心函数

query() 是整个系统的核心调度函数:

// src/query.ts(简化) async function query(input: ProcessedInput, options: QueryOptions) { // 1. 构建消息历史 const messages = buildMessages(input, options) // 2. 设置工具集 const tools = getTools(options.toolContext) // 3. 开始流式调用 const stream = await streamMessage({ messages, system: buildSystemPrompt(options), tools, model: options.model, maxTokens: options.maxTokens, thinking: options.thinkingConfig, }) // 4. 处理流式事件 for await (const event of stream) { switch (event.type) { case 'content_block_start': handleContentBlockStart(event) break case 'content_block_delta': handleContentBlockDelta(event) break case 'content_block_stop': handleContentBlockStop(event) break case 'message_delta': handleMessageDelta(event) break case 'message_stop': // 检查是否有 tool_use 需要执行 await handleToolUses(messages, event) break } } }

Tool Use 循环

Tool Use 是 Anthropic API 的核心机制,Claude Code 基于此构建:

// Tool Use 循环伪代码 async function toolUseLoop(messages: Message[]) { while (true) { const response = await callAPI(messages) messages.push(response) const toolUses = response.content.filter( block => block.type === 'tool_use' ) if (toolUses.length === 0) break // 并行执行工具 const results = await Promise.all( toolUses.map(toolUse => executeTool(toolUse)) ) // 添加工具结果 messages.push({ role: 'user', content: results }) } }

工具执行详细流程

tool_use 事件 toolMatchesName() ← 查找工具 ├── 精确匹配 (tool.name === name) └── 别名匹配 (tool.aliases?.includes(name)) inputSchema.safeParse() ← Zod v4 验证输入 ├── 验证失败 → 返回错误 tool_result canUseTool() ← 权限检查 ├── 允许 (权限模式/历史确认) ├── 拒绝 → 返回拒绝 tool_result └── 需要确认 → 等待用户响应 ├── 允许一次 ├── 允许本次会话 └── 拒绝 tool.execute() ← 执行工具 ├── 成功 → ToolResult └── 失败 → ToolResult (is_error: true) renderToolResult() ← 渲染到终端 返回 tool_result ← 继续循环

并发与取消

AbortController

每个查询都有一个 AbortController,支持取消正在进行的操作:

const abortController = new AbortController() // 用户按 Ctrl+C abortController.abort() // 工具执行中检查 if (context.abortController.signal.aborted) { throw new AbortedError() }

工具并发

同一轮中多个 tool_use 可以并行执行:

// 并行执行所有工具调用 const results = await Promise.allSettled( toolUses.map(toolUse => executeToolWithTimeout(toolUse, timeout)) ) // 处理结果 results.forEach((result, i) => { if (result.status === 'fulfilled') { messages.push(result.value) } else { messages.push({ type: 'tool_result', tool_use_id: toolUses[i].id, content: `Error: ${result.reason}`, is_error: true, }) } })

流式输出处理

Claude Code 使用 Ink (React for CLI) 渲染流式输出:

SSE 事件流 ├── thinking_delta → 折叠显示推理过程 ├── text_delta → 实时渲染文本 │ ├── Markdown 解析 │ ├── 代码高亮 │ └── 终端宽度适配 ├── tool_use content_block_start │ └── 显示工具名称和参数 ├── input_json_delta │ └── 实时显示工具参数 └── message_delta └── 更新 token 计数和状态

错误处理与重试

// API 错误分类与重试策略 function handleAPIError(error: unknown) { const categorized = categorizeRetryableAPIError(error) switch (categorized.category) { case 'rate_limit': // 指数退避重试 return retryWithBackoff(query, { maxRetries: 3 }) case 'overloaded': // 短暂等待后重试 return retryWithBackoff(query, { maxRetries: 2, delay: 5000 }) case 'timeout': // 减少上下文后重试 return retryWithCompactContext(query) case 'auth': // 提示重新认证 return reauthenticate() default: // 显示错误信息 return showError(error) } }
Last updated on