为什么 Agent 上了生产,先崩的不是模型而是那层管道

Agent Gateway:接入、路由、持久化、可靠投递、并发,让 Agent 真正活在生产环境。

Module B · 第 2 讲

立靶:把模型直连用户,会怎么死

先看一段几乎所有人第一版都会写出来的代码——一个 Agent 的最小循环:

while True:
    user_input = input()
    messages.append({"role": "user", "content": user_input})
    response = client.messages.create(messages=messages)
    print(response.content[0].text)

在你自己的终端里,它跑得很好。你问一句,它答一句,看起来「Agent 已经做好了」。但凡你想把它交给真实用户,下面这些事会一件接一件地发生:

  • 进程一重启,记忆全没了。 历史只活在内存的 messages 列表里。服务器重启、代码热更、甚至一个未捕获的异常,整段对话凭空蒸发。
  • 只认得一个入口。 这段代码绑死在 input() 上。今天用户在某 IM 群里 at 你,明天换成另一个协作平台,每接一个平台都要把整个循环重抄一遍。
  • 网络抖一下,消息就丢了。 超时、对端 503、发送那一刻断网——这条消息既没送达,也没人重试,就这么没了。
  • 多个人同时说话,互相打架。 两个用户共用一份 messages,上下文交叉污染,回复张冠李戴。
  • 上下文越堆越长,直到一次 400。 对话不删不减,迟早撞上模型的上下文上限,然后是一个生硬的报错。
  • 一个 API Key 被限速,全线瘫痪。 没有任何冗余,供应商一限流,你的 Agent 直接哑火。

这些问题有一个共同点:它们都不是「模型不够聪明」的问题,而是「Agent 没有底盘」的问题。 模型再强,也需要一层基础设施把它和真实世界连起来、扛住生产环境的各种意外。这层东西,就是 Gateway。

一句话定位:Harness 工程解决「Agent 能不能做到」;Gateway 工程解决「Agent 做到了能不能送达、崩了能不能恢复、大了能不能扛住」。Harness 是发动机,Gateway 是车身底盘。

框架:Gateway 的五个核心职责

把上面那一串「会怎么死」反过来,就是 Gateway 必须扛起的五件事。它们不是五个独立功能,而是一个 Agent 要「活在生产环境」必须同时具备的五种能力。

Gateway 底盘夹在平台入口与 Agent Loop 之间,扛起多通道接入/路由/持久化/可靠投递/并发五职责
Gateway 底盘夹在平台入口与 Agent Loop 之间,扛起多通道接入/路由/持久化/可靠投递/并发五职责
职责解决什么问题关键设计
多通道接入多个平台入口格式各异,不想为每个平台重写循环归一化成统一消息格式,每个平台只实现 receive() + send()
消息路由一条消息进来,该交给哪个 Agent / 哪套配置按「精确到粗略」分层匹配,从特定用户一路兜底到全局默认
会话持久化进程重启后历史不能丢,上下文不能无限膨胀历史追加落盘(JSONL),崩溃安全;超长时分层截断 + 压缩
可靠投递网络超时、对端故障时消息不丢、能重试先写盘再投递(Write-Ahead Queue),指数退避重试
并发控制多任务、多用户同时进行时互不干扰,用户优先按工作类型分独立队列(Named Lanes),各自串行,用户消息抢先

不变量:那个永远不改的循环

Gateway 工程的第一条纪律是:那个 Agent Loop 是不变量,后续所有功能都叠加在它之上,不去改它。

while True:
    user_input = channel.receive()
    messages.append({"role": "user", "content": user_input})
    response = client.messages.create(messages=messages)
    if response.stop_reason == "end_turn":
        channel.send(response.content[0].text)
    elif response.stop_reason == "tool_use":
        ...  # 工具派发逻辑

它和立靶那版只有一个差别:input() / print() 换成了 channel.receive() / channel.send()。一个抽象,就把「绑死单一入口」的问题撬开了。还有一条容易踩的坑:工具执行的结果必须以 user 角色的消息放回对话,这是 API 的硬性规定,放错角色模型就读不到。

多通道接入:一个数据类抹平所有平台

不管消息从哪个平台来,统一收敛成一个结构:

@dataclass
class InboundMessage:
    text: str
    sender_id: str
    channel: str   # 哪个平台来的
    scope: str     # 私聊还是群聊

每接一个新平台,只需实现它的 receive()send()。循环本身、路由、持久化全都不用动。

消息路由:五层优先级,从精确到兜底

一条消息进来,到底用哪套配置?按优先级从高到低逐层匹配,命中即止:

平台 + 类型 + 发送者   →  精确匹配(给特定用户的专属配置)
平台 + 类型            →  群组级
平台                   →  平台级
类型                   →  类型级(所有私聊 / 所有群聊)
*                      →  全局默认(兜底)

绝大多数消息走全局默认,只在需要时才为某平台、某群、某 VIP 覆写配置。

会话持久化:落盘 + 三层溢出保护

持久化用 JSONL:每来一条消息追加一行。追加是崩溃安全的——写一半挂了,最多丢最后一行。上下文快撑爆时,不是直接报错,而是三层渐进降级:

Attempt 0: 正常调用
Attempt 1: 截断过大的工具结果(往往是某次工具吐了一大坨)
Attempt 2: 用 LLM 把最旧的 50% 历史压缩成摘要,替换原文

先动代价最小的,不够再动代价大的——典型的「按代价排序、逐级加码」降级思路。

可靠投递:先写盘,再投递

这是整套 Gateway 里最体现「生产意识」的一环:

tmp.write_text(json.dumps(entry))  # 1. 先写临时文件
os.fsync(...)                      # 2. 强制刷盘,确保真落到磁盘
os.replace(tmp, final)             # 3. 原子重命名,入队成功

这三步叫 Write-Ahead Queue。哪怕投递前一刻进程崩了,这条消息已经稳稳躺在磁盘上,重启后能被捞起来重投。投递失败则指数退避重试(约 5s → 25s → 2min → 10min),再叠加 ±20% 抖动,避免多个失败任务在同一时刻齐刷刷重试。完整的容错是三层叠起来的:投递队列重试(网络/对端临时故障)→ 上下文截断/压缩(上下文超限)→ API Key 轮换(一个被限速,自动换下一个)。

并发控制:分车道,用户永远优先

按「工作类型」分成几条独立车道(Named Lanes),每条车道内部串行:

lanes = {
    "main":      LaneQueue(max_concurrency=1),  # 用户消息,阻塞等结果
    "heartbeat": LaneQueue(max_concurrency=1),  # 后台心跳,不阻塞用户
    "cron":      LaneQueue(max_concurrency=1),  # 定时任务
}

分车道的意义在于隔离:定时任务跑得再久也卡不住正在等回复的真人,用户那条车道永远优先。还有一个关键细节:generation(代次)机制防僵尸任务——进程重启后,旧任务发现自己的代次号和当前的对不上,就知道「我已经是上一辈子的任务了」,安静退出。

case:一个会主动找你说话的 Agent

光是「被动应答」还不算真正活在生产环境。让 Agent 主动行动:它有一个后台心跳线程,每秒醒一次,看看「有没有什么事该主动做了」。

def heartbeat_thread():
    while True:
        if not lock.acquire(blocking=False):  # 用户正在对话,让路
            sleep(1); continue
        response = run_agent_turn(trigger="heartbeat")
        if "HEARTBEAT_OK" not in response:     # 有实质内容才推送
            output_queue.put(response)

这段不到二十行的代码,藏着三个 Gateway 设计的影子:

  1. 并发隔离在起作用。 心跳第一件事是尝试拿锁,拿不到就立刻让路——这就是 Named Lanes 里「用户优先」的具体体现。
  2. 主动性靠一个约定收口。 Agent 觉得「没事」就回一句 HEARTBEAT_OK,Gateway 看到这个标记就静默丢弃,只有真有实质内容时才推送。一个字符串约定,挡住了 99% 的噪音。
  3. 定时任务用配置文件驱动。 用一个 CRON.json 支持「某时刻执行一次」「每隔多久执行」「cron 表达式」三种触发。

把它放到真实业务想象:一个运营助理 Agent,白天有人问就答(main 车道),没人问时靠心跳检查「有没有到点该推的日报」(heartbeat 车道),整点被定时任务唤起跑巡检(cron 车道)。三条车道各行其道,真人来了立刻让路。这才像一个「住在生产环境里、而不是躺在你终端里」的 Agent。

可操作做法:给你的 Agent 装上底盘

按「痛感出现的先后」加,基本不会错:

  1. 先把循环和入口解耦。 把所有 input()/print() 换成 channel.receive()/channel.send()。成本最低,收益最大。
  2. 把对话历史落盘成 JSONL。 先解决「会失忆」这个最伤用户信任的问题。
  3. 加上下文溢出保护。 按「截断臃肿工具结果 → 压缩最旧历史」分层降级。
  4. 接第二个平台时,才动手抽象通道。 不要过早抽象。
  5. 消息路由按需分层。 一开始只有全局默认就够,出现特殊配置需求时再往上加规则。
  6. 投递改成先写盘再发(WAQ)。 当消息丢失开始真实伤害用户时再升级。
  7. 备多个 API Key 做轮换。 把「一个被限速」从故障变成一次自动切换。
  8. 需要并发时,按工作类型分车道,而不是裸开线程。 重启后用 generation 机制清理僵尸任务。

顺序提醒:上面是按「痛感出现的先后」排的,不是让你一次性全做。过早做并发、过早抽象多通道,是另一种形式的过度工程。哪里先疼,先治哪里。

收口

Harness 是发动机,Gateway 是车身底盘——模型负责跑得多快,Gateway 负责崩了能恢复、断了能重投、挤了能让路。把 Agent 交给真实用户之前,先问一句:它有底盘吗?