消息流转
完整请求生命周期
用户的一次输入到 AI 响应完成,经历以下阶段:
用户输入文本
│
▼
processUserInput() ← 输入预处理
│ - 解析斜杠命令 (/init, /review, ...)
│ - 处理 @文件引用 (@file.ts, @dir/)
│ - 附件处理 (图片、PDF)
│ - 粘贴图片检测
▼
query() ← 核心查询函数 (src/query.ts)
│
├──→ 构建 Messages ← context.ts
│ - System Prompt (基础指令)
│ - CLAUDE.md 内容 (项目级 + 用户级)
│ - Git 上下文 (分支、状态、远程)
│ - 历史对话 (含压缩摘要)
│ - 工具定义列表
│
├──→ 调用 API ← services/api/
│ - Anthropic Messages API
│ - 流式响应 (SSE)
│ - Token 用量追踪
│
├──→ 处理响应
│ ├── 文本输出 → 直接渲染到终端
│ ├── thinking 块 → 折叠显示推理过程
│ └── tool_use → 进入工具循环
│
└──→ 工具循环
│
├──→ 匹配工具 (toolMatchesName)
├──→ 验证输入 (Zod Schema)
├──→ 权限检查 (canUseTool)
├── ├── 允许 → 执行
├── ├── 拒绝 → 返回拒绝消息
├── └── 需要确认 → 弹出确认对话框
├──→ 执行工具 (tool.execute)
├──→ 渲染结果 (renderToolResult)
├──→ 返回 tool_result 到消息列表
└──→ 继续 API 调用(直到无 tool_use)processUserInput 详解
// src/processUserInput.ts(简化)
async function processUserInput(raw: string): Promise<ProcessedInput> {
// 1. 斜杠命令检查
if (raw.startsWith('/')) {
const command = parseSlashCommand(raw)
return { type: 'command', command }
}
// 2. @文件引用处理
const { text, attachments } = await processAtMentions(raw)
// 3. 粘贴图片检测
if (hasClipboardImage()) {
attachments.push(await saveClipboardImage())
}
return { type: 'query', text, attachments }
}@文件引用
@file.ts 语法将文件内容直接注入对话:
// 支持的引用格式
@path/to/file.ts // 单个文件
@src/components/ // 整个目录(递归)
@package.json // 项目配置处理流程:
- 解析
@标记 - 读取文件/目录内容
- 添加为
text类型的 ContentBlock - 保留原始
@引用供 AI 理解上下文
query() 核心函数
query() 是整个系统的核心调度函数:
// src/query.ts(简化)
async function query(input: ProcessedInput, options: QueryOptions) {
// 1. 构建消息历史
const messages = buildMessages(input, options)
// 2. 设置工具集
const tools = getTools(options.toolContext)
// 3. 开始流式调用
const stream = await streamMessage({
messages,
system: buildSystemPrompt(options),
tools,
model: options.model,
maxTokens: options.maxTokens,
thinking: options.thinkingConfig,
})
// 4. 处理流式事件
for await (const event of stream) {
switch (event.type) {
case 'content_block_start':
handleContentBlockStart(event)
break
case 'content_block_delta':
handleContentBlockDelta(event)
break
case 'content_block_stop':
handleContentBlockStop(event)
break
case 'message_delta':
handleMessageDelta(event)
break
case 'message_stop':
// 检查是否有 tool_use 需要执行
await handleToolUses(messages, event)
break
}
}
}Tool Use 循环
Tool Use 是 Anthropic API 的核心机制,Claude Code 基于此构建:
// Tool Use 循环伪代码
async function toolUseLoop(messages: Message[]) {
while (true) {
const response = await callAPI(messages)
messages.push(response)
const toolUses = response.content.filter(
block => block.type === 'tool_use'
)
if (toolUses.length === 0) break
// 并行执行工具
const results = await Promise.all(
toolUses.map(toolUse => executeTool(toolUse))
)
// 添加工具结果
messages.push({
role: 'user',
content: results
})
}
}工具执行详细流程
tool_use 事件
│
▼
toolMatchesName() ← 查找工具
│
├── 精确匹配 (tool.name === name)
└── 别名匹配 (tool.aliases?.includes(name))
│
▼
inputSchema.safeParse() ← Zod v4 验证输入
│
├── 验证失败 → 返回错误 tool_result
│
▼
canUseTool() ← 权限检查
│
├── 允许 (权限模式/历史确认)
├── 拒绝 → 返回拒绝 tool_result
└── 需要确认 → 等待用户响应
│
├── 允许一次
├── 允许本次会话
└── 拒绝
│
▼
tool.execute() ← 执行工具
│
├── 成功 → ToolResult
└── 失败 → ToolResult (is_error: true)
│
▼
renderToolResult() ← 渲染到终端
│
▼
返回 tool_result ← 继续循环并发与取消
AbortController
每个查询都有一个 AbortController,支持取消正在进行的操作:
const abortController = new AbortController()
// 用户按 Ctrl+C
abortController.abort()
// 工具执行中检查
if (context.abortController.signal.aborted) {
throw new AbortedError()
}工具并发
同一轮中多个 tool_use 可以并行执行:
// 并行执行所有工具调用
const results = await Promise.allSettled(
toolUses.map(toolUse => executeToolWithTimeout(toolUse, timeout))
)
// 处理结果
results.forEach((result, i) => {
if (result.status === 'fulfilled') {
messages.push(result.value)
} else {
messages.push({
type: 'tool_result',
tool_use_id: toolUses[i].id,
content: `Error: ${result.reason}`,
is_error: true,
})
}
})流式输出处理
Claude Code 使用 Ink (React for CLI) 渲染流式输出:
SSE 事件流
│
├── thinking_delta → 折叠显示推理过程
│
├── text_delta → 实时渲染文本
│ ├── Markdown 解析
│ ├── 代码高亮
│ └── 终端宽度适配
│
├── tool_use content_block_start
│ └── 显示工具名称和参数
│
├── input_json_delta
│ └── 实时显示工具参数
│
└── message_delta
└── 更新 token 计数和状态错误处理与重试
// API 错误分类与重试策略
function handleAPIError(error: unknown) {
const categorized = categorizeRetryableAPIError(error)
switch (categorized.category) {
case 'rate_limit':
// 指数退避重试
return retryWithBackoff(query, { maxRetries: 3 })
case 'overloaded':
// 短暂等待后重试
return retryWithBackoff(query, { maxRetries: 2, delay: 5000 })
case 'timeout':
// 减少上下文后重试
return retryWithCompactContext(query)
case 'auth':
// 提示重新认证
return reauthenticate()
default:
// 显示错误信息
return showError(error)
}
}Last updated on