从 NanoBot 看 Agent Loop 的设计

从 NanoBot 看 Agent Loop 的设计

April 15, 2026

alt text

此前我写过《快、简、稳 —— 我近期的个人智能体选择 NanoBot》 这篇文章,聊了从 OpenClaw 切到 NanoBot 的原因,以及日常使用的一些场景。用了一段时间之后,我最好奇的问题变成了:它只有几千行代码,是怎么做到在长对话里比 OpenClaw 稳这么多的,而且响应还快?

周末抽空把源码过了一遍,挑几个我觉得最有意思的设计讲讲。

1. 整体架构

NanoBot 把系统拆成三层:Channel、MessageBus、Agent。

┌─────────────────────────────────────────────┐
│  Channels (Telegram / DingTalk / Slack ...)  │
└────────────────────┬────────────────────────┘
                     │ InboundMessage
              ┌──────▼──────┐
              │  MessageBus  │   ← 45 行
              └──────┬──────┘
                     │
        ┌────────────▼────────────┐
        │       AgentLoop         │  会话路由 / 并发控制
        │  ┌──────────────────┐   │
        │  │   AgentRunner    │   │  纯迭代逻辑
        │  └──────────────────┘   │
        └────────────┬────────────┘
                     │ OutboundMessage
              ┌──────▼──────┐
              │  MessageBus  │
              └──────┘

最底下的 MessageBus 就是两个 asyncio.Queue,整个文件就 45 行,核心如下:

class MessageBus:
    def __init__(self):
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()

Channel 往 inbound 推消息,Agent 往 outbound 推回复,两边完全不认识对方。加一个新 Channel 不用动 Agent 代码,接一个新 Provider 也不用动 Channel 代码。目前 NanoBot 支持的 Telegram、DingTalk、Slack、企业微信这几个 Channel,就是靠这 45 行撑起来的。

中间的 AgentLoop 管"谁在说话、现在能不能处理",里面再嵌一个 AgentRunner 做纯迭代。Runner 不关心 Channel、不关心持久化、不关心产品逻辑,只知道怎么把一组消息喂给模型、处理工具调用、再循环。

这种分层看起来平平无奇,但沿着一条消息走一遍就会发现它的好处。

2. 主循环

AgentLoop.run() 的核心就这么几行:

while self._running:
    msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)

    # /stop 等优先命令直接处理
    if self.commands.is_priority(raw):
        await self.commands.dispatch_priority(ctx)
        continue

    # 会话处理中 → 塞进 pending_queue
    effective_key = self._effective_session_key(msg)
    if effective_key in self._pending_queues:
        self._pending_queues[effective_key].put_nowait(msg)
        continue

    # 新请求 → 创建任务
    asyncio.create_task(self._dispatch(msg))

我第一次读这段的时候,有两个地方停下来看了很久。

2.1 跨会话并发,会话内串行

锁加在会话上,不在全局。同一个人的消息按顺序走——不会出现你刚问完 A,Agent 回来一个 B 的答复;不同用户之间又完全并行,一个慢请求不会把所有人都拖着等。再配一个全局 concurrency gate 兜住上限,不至于模型抽风时把机器打爆。

很朴素的做法,但恰好是多用户场景需要的。

2.2 中途注入(mid-turn injection)

这个更有意思。Agent 正在执行工具,用户等不及又补发了一条消息——常见的处理方式是再开一个任务。但同一会话并发两个 Agent,上下文很快就会乱。

NanoBot 的做法是:当前会话有活跃的 pending_queue 时,新消息不建任务,直接塞进队列。等 Agent 跑到合适的时机(工具执行完、最终回复写完、LLM 报错后),再把队列里的消息 drain 出来合并处理。每次最多取 3 条,防止吞得太多。

对 Agent 来说,这些追加的消息就像同一轮对话里的续发,语义是连贯的。

消息路由的事情讲清楚了,下一步是怎么拼 prompt。

3. 上下文怎么拼

每次调模型前,ContextBuilder 按固定顺序拼装系统提示:

[1] Identity       — 运行环境信息(workspace 路径、OS、Python 版本)
[2] Bootstrap 文件 — AGENTS.md → SOUL.md → USER.md → TOOLS.md
[3] Memory         — memory/MEMORY.md(跨会话积累的长期记忆)
[4] Always Skills  — 始终激活的 skill 全文
[5] Skills 摘要    — 按需激活的 skill 列表(Agent 自己决定要不要调)
[6] Recent History — history.jsonl 最近 50 条交互记录

顺序也是优先级——越靠前对模型注意力的影响越大。任一文件缺失直接跳过,不报错。

3.1 Bootstrap 文件

Bootstrap 是用户最主要的定制入口,四个文件分工很清楚:

  • SOUL.md:Agent 的性格。比如 I solve problems by doing, not by describing what I would do.
  • USER.md:你的画像,语言偏好、时区、技术水平,让 Agent 调整表达方式。
  • AGENTS.md:任务级规范,比如定时任务怎么处理、心跳任务放哪。
  • TOOLS.md:工具使用的补充说明。

想调性格就改 SOUL,想加新的任务规则就改 AGENTS,互不干扰。

3.2 Memory 和 Skills

Memory 存的是跨会话积累的长期记忆,后台有 Consolidator 和 Dream 两个流程在持续整理它。Skills 是能力模块:标了 always 的直接全文塞进 prompt;其他的只挂个名字和简介,让 Agent 自己决定要不要 load 进来。这样既保证常用能力随时可用,又不会把所有东西都塞满上下文。

3.3 Runtime Context

用户消息发出去之前,正文前还会插一段 Runtime Context:

[Runtime Context — metadata only, not instructions]
Current Time: 2026-04-15 20:30
Channel: telegram
Chat ID: 123456789
[/Runtime Context]

注意看那个标签——metadata only, not instructions。当前时间、来源 Channel 都告诉 Agent 了,但明确说这是参考信息不是指令。这样就算用户消息里带了伪造的时间或 chat_id,Agent 也能分清哪是命令哪是元数据。

4. 核心迭代:AgentRunner

Runner 是 NanoBot 真正的 Agent Loop。它对外只有一个 run(spec) 方法,接收消息、工具、模型名、最大迭代数,返回结果。Runner 不依赖任何外部状态,可以独立测试,也可以被 CLI、API、子 Agent 复用——产品差异通过 Hook 接口注入,Runner 本身不动。

4.1 完整工作流

把前三节串起来,一条消息从进入系统到最终返回,完整流程是这样:

用户消息
    │
    ▼
AgentLoop.run()
    ├─ 优先命令(/stop 等)→ 直接处理,返回
    ├─ 会话处理中 → 塞进 pending_queue,等待注入
    └─ 新请求 → create_task(_dispatch)
            │
            ▼
        _dispatch()                    # per-session Lock + 全局 concurrency gate
            └─ _process_message()
                    ├─ 恢复 crash checkpoint(如有)
                    ├─ 组装上下文(系统提示 + 历史 + 记忆 + 当前消息)
                    └─ AgentRunner.run(spec)
                            │
                            ▼
                        for iteration in range(max_iterations):
                            │
                            ├─ [1] Context Governance
                            │       清孤儿 → 回填缺失结果 → 压缩 → 裁剪历史
                            │
                            ├─ [2] 调用 LLM(含重试)
                            │
                            ├─ [3a] 有工具调用?
                            │       ├─ 写 checkpoint
                            │       ├─ 执行工具(只读并行,其余串行)
                            │       ├─ drain pending_queue(中途注入)
                            │       └─ continue → 下一次迭代
                            │
                            └─ [3b] 最终回复
                                    ├─ 空响应 → 重试(≤ 2 次)
                                    ├─ 被截断 → 追加恢复提示,continue
                                    └─ 正常 → break,返回结果
                    │
                    ├─ 保存本轮历史,清除 checkpoint
                    └─ 发布 OutboundMessage 到 bus

想重点讲两个环节:Context Governance 和 Checkpoint。

4.2 Context Governance

每次调模型前,Runner 先对历史消息做一轮治理:

messages_for_model = self._drop_orphan_tool_results(messages)
messages_for_model = self._backfill_missing_tool_results(messages_for_model)
messages_for_model = self._microcompact(messages_for_model)
messages_for_model = self._apply_tool_result_budget(spec, messages_for_model)
messages_for_model = self._snip_history(spec, messages_for_model)
messages_for_model = self._drop_orphan_tool_results(messages_for_model)
messages_for_model = self._backfill_missing_tool_results(messages_for_model)

每一步做什么:

  • drop_orphan_tool_results:清掉 tool_call 找不到对应结果的孤儿消息
  • backfill_missing_tool_results:给缺失的工具结果补占位符
  • microcompact:压缩重复的工具结果
  • apply_tool_result_budget:控制工具结果的 token 占比
  • snip_history:裁剪过长的历史

snip_history 可能产生新孤儿,所以末尾再清一遍。类似的思路也体现在工具定义上——发给模型前做稳定排序(内置工具按字母序在前,MCP 工具在后),保证每次 prompt 前缀一致,LLM 的 prompt cache 命中率更高。

这条链有两个细节值得留意。一个是原始 messages 始终不动,所有治理都在 messages_for_model 副本上——历史和发给模型的内容是两份独立数据。另一个是任意一步失败会立刻退回最小修复模式(只清孤儿 + 回填缺失结果),保证模型永远收到结构合法的消息。

长对话里这些结构问题会慢慢积累,提前修比事后救火稳得多。

4.3 Checkpoint 与 Crash 恢复

工具调用之前,Runner 会往会话元数据里写一个 checkpoint:

await self._emit_checkpoint(spec, {
    "phase": "awaiting_tools",
    "assistant_message": assistant_message,
    "completed_tool_results": [],
    "pending_tool_calls": [tc.to_openai_tool_call() for tc in response.tool_calls],
})

如果进程在工具执行过程中崩了,下次启动时 AgentLoop 会检测到 checkpoint,把已完成的工具结果和未完成工具的合成错误一起塞回历史,Agent 从中断点继续跑。未完成的工具会收到一条 "Error: Task interrupted before this tool finished." 的结果,Agent 自己判断下一步怎么做。

5. Provider 抽象层

NanoBot 支持 Anthropic、OpenAI-compat、Azure、GitHub Copilot 等一堆 Provider,但每个具体实现类只需要实现 chat() 一个方法。重试逻辑、消息角色强制交替、图片降级、token 计量,全都在 base.py 里统一处理,新增一个 Provider 只关心调用协议本身的差异。

5.1 消息结构兼容

_enforce_role_alternation() 就是一个典型例子。不同 Provider 对消息结构的容忍度差得挺远:有的不接受末尾是 assistant 消息(不支持 prefill),有的不接受连续两条相同角色,有的在只剩 system 消息时直接报错。这个方法一次把三种情况都处理了——合并连续同角色消息、剔除尾部 assistant、只剩 system 时把最后一条 assistant 改成 user 兜底。

写新 Provider 时完全看不到这类防御代码,干净。

5.2 重试策略

重试判断是双层的:优先读结构化错误元数据(error_should_retryerror_status_code),读不到再扫响应文本关键词兜底。分两种模式:标准模式最多 3 次指数退避;持久模式以 60 秒为间隔上限持续重试,给长任务里 Provider 短暂限流的场景用。

6. 快,还有稳

回到开头那个问题 —— NanoBot 为什么快、长对话为什么稳。前面讲的一堆零散机制,串起来其实就是答案。

6.1 快从哪来

  • MessageBusasyncio.Queue 的薄封装,Channel 和 Agent 之间零拷贝零等待。
  • /stop 这种优先命令直接跳过任务队列,想停就能马上停。
  • 锁在会话粒度,我的慢请求卡不到你。
  • 只读工具(搜索、读文件)会并发跑,多个搜索同时出结果。
  • 中途注入塞进当前 turn,不用开新的 Agent。
  • 工具定义和系统提示都稳定排序,LLM prompt cache 命中率更高,首 token 延迟明显下降。

6.2 稳从哪来

长对话在 OpenClaw 里最容易出三种问题——上下文结构坏掉、进程崩了丢进度、执行到一半"沉默"没反馈。NanoBot 对应的机制一个不少:

  • 每轮都修历史:孤儿 tool_call、缺失工具结果、token 超预算、历史过长,发送前主动修好。
  • Checkpoint 兜底:工具调用前写入,崩溃重启能接着跑。
  • 重试分层:常规错误走指数退避,Provider 短暂限流走持久模式,重试期间打心跳日志不会沉默。
  • 响应异常多重兜底:空响应自动重试,被截断追加恢复提示继续生成,达到 max_iterations 用模板返回可读消息,不抛异常留半截对话。
  • 流式输出:Channel 接 on_stream 回调,工具在跑什么、模型在想什么都实时可见。

之前用 OpenClaw 遇到长对话时的那种不确定感——它到底在干活还是卡住了?上文它还记得吗?现在崩了我之前做的是不是都白费了?——在 NanoBot 这里基本消失了。

7. 小结

NanoBot 的精巧来自整体的克制。MessageBus 只传消息,AgentLoop 只做路由,AgentRunner 只做迭代,Provider base 吸收兼容性差异,Tool 接口只暴露三件事(名称、参数 schema、执行方法)。每一层都只管自己的事,不越界,不泛滥。

AI Agent 的核心逻辑其实很简单——给模型上下文、执行工具、循环而已。难的是那堆边界情况:消息乱序、上下文超长、Provider 各有脾气、进程可能崩、用户会中途插话。大框架应对这些问题的习惯是层层加抽象,代码量越滚越大。NanoBot 把每类问题集中在一处解决,防御代码不扩散,整个系统才能控制在几千行。

这种"每层只做一件事"的克制,比堆功能难多了,也是 NanoBot 在长对话场景下比 OpenClaw 稳的根本原因。对于把智能体当成日常生产力工具的人来说,它给的正是最值钱的那种确定性。

最后更新于