第五章:Middleware(中间件模式)
本章目标:理解 Middleware 模式,实现 Tool 错误处理和 ChatModel 重试机制。
为什么需要 Middleware
第四章我们为 Agent 添加了 Tool 能力,让 Agent 能够访问文件系统。但在实际应用场景中,Tool 报错或 ChatModel 报错是常见的现象,例如:
- Tool 报错:文件不存在、参数错误、权限不足等
- ChatModel 报错:API 限流(429)、网络超时、服务不可用等
问题一:Tool 错误会中断整个流程
当 Tool 执行失败时,错误会直接传播到 Agent,导致整个对话中断:
[tool call] read_file(file_path: "nonexistent.txt")
Error: open nonexistent.txt: no such file or directory
// 对话中断,用户需要重新开始
问题二:模型调用可能因限流失败
当模型 API 返回 429(Too Many Requests)错误时,整个对话也会中断:
Error: rate limit exceeded (429)
// 对话中断
期望的行为
这些报错信息往往不希望直接终止 Agent 流程,而是希望把报错信息给到模型,由模型自动纠错进行下一轮。例如:
[tool call] read_file(file_path: "nonexistent.txt")
[tool result] [tool error] open nonexistent.txt: no such file or directory
[assistant] 抱歉,文件不存在。让我先列出当前目录的文件...
[tool call] glob(pattern: "*")
Middleware 的定位
Middleware 模式可以扩展 Tool 和 ChatModel 的行为,非常适合解决这个问题:
- Middleware 是 Agent 的拦截器:在调用前后插入自定义逻辑
- Middleware 可处理错误:将错误转换为模型可理解的格式
- Middleware 可实现重试:自动重试失败的操作
- Middleware 可组合:多个 Middleware 可以串联使用
简单类比:
- Agent = “业务逻辑”
- Middleware = “AOP 切面”(日志、重试、错误处理等横切关注点)
代码位置
- 入口代码:cmd/ch05/main.go
前置条件
与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark)。同时,需要与第四章一样设置 PROJECT_ROOT:
export PROJECT_ROOT=/path/to/eino # Eino 核心库根目录
运行
在 examples/quickstart/chatwitheino 目录下执行:
# 设置项目根目录
export PROJECT_ROOT=/path/to/your/project
go run ./cmd/ch05
输出示例:
you> 列出当前目录的文件
[assistant] 我来帮你列出文件...
[tool call] list_files(directory: ".")
you> 读取一个不存在的文件
[assistant] 尝试读取文件...
[tool call] read_file(file_path: "nonexistent.txt")
[tool result] [tool error] open nonexistent.txt: no such file or directory
[assistant] 抱歉,文件不存在...
关键概念
Middleware 接口
ChatModelAgentMiddleware 是 Agent 的中间件接口:
type ChatModelAgentMiddleware interface {
// BeforeAgent is called before each agent run, allowing modification of
// the agent's instruction and tools configuration.
BeforeAgent(ctx context.Context, runCtx *ChatModelAgentContext) (context.Context, *ChatModelAgentContext, error)
// BeforeModelRewriteState is called before each model invocation.
// The returned state is persisted to the agent's internal state and passed to the model.
BeforeModelRewriteState(ctx context.Context, state *ChatModelAgentState, mc *ModelContext) (context.Context, *ChatModelAgentState, error)
// AfterModelRewriteState is called after each model invocation.
// The input state includes the model's response as the last message.
AfterModelRewriteState(ctx context.Context, state *ChatModelAgentState, mc *ModelContext) (context.Context, *ChatModelAgentState, error)
// WrapInvokableToolCall wraps a tool's synchronous execution with custom behavior.
// This method is only called for tools that implement InvokableTool.
WrapInvokableToolCall(ctx context.Context, endpoint InvokableToolCallEndpoint, tCtx *ToolContext) (InvokableToolCallEndpoint, error)
// WrapStreamableToolCall wraps a tool's streaming execution with custom behavior.
// This method is only called for tools that implement StreamableTool.
WrapStreamableToolCall(ctx context.Context, endpoint StreamableToolCallEndpoint, tCtx *ToolContext) (StreamableToolCallEndpoint, error)
// WrapEnhancedInvokableToolCall wraps an enhanced tool's synchronous execution.
// This method is only called for tools that implement EnhancedInvokableTool.
WrapEnhancedInvokableToolCall(ctx context.Context, endpoint EnhancedInvokableToolCallEndpoint, tCtx *ToolContext) (EnhancedInvokableToolCallEndpoint, error)
// WrapEnhancedStreamableToolCall wraps an enhanced tool's streaming execution.
// This method is only called for tools that implement EnhancedStreamableTool.
WrapEnhancedStreamableToolCall(ctx context.Context, endpoint EnhancedStreamableToolCallEndpoint, tCtx *ToolContext) (EnhancedStreamableToolCallEndpoint, error)
// WrapModel wraps a chat model with custom behavior.
// This method is called at request time when the model is about to be invoked.
WrapModel(ctx context.Context, m model.BaseChatModel, mc *ModelContext) (model.BaseChatModel, error)
}
设计理念:
- 装饰器模式:每个 Middleware 包装原始调用,可以修改输入、输出或错误
- 洋葱模型:请求从外向内穿过 Middleware,响应从内向外返回
- 可组合:多个 Middleware 按顺序执行
Middleware 执行顺序
Handlers(即 Middlewares)按数组正序包装,形成洋葱模型:
Handlers: []adk.ChatModelAgentMiddleware{
&middlewareA{}, // 最外层:最先 Wrap,最先拦截请求,但 WrapModel 最后生效
&middlewareB{}, // 中间层
&middlewareC{}, // 最内层:最后 Wrap
}
对于 Tool 调用的执行顺序:
请求 → A.Wrap → B.Wrap → C.Wrap → 实际 Tool 执行 → C返回 → B返回 → A返回 → 响应
实用建议: 将 safeToolMiddleware(错误捕获)放在最内层(数组末尾),确保其他 Middleware 抛出的中断错误能正确向外传播。
SafeToolMiddleware
SafeToolMiddleware 将 Tool 错误转换为字符串,让模型能够理解并处理:
type safeToolMiddleware struct {
*adk.BaseChatModelAgentMiddleware
}
func (m *safeToolMiddleware) WrapInvokableToolCall(
_ context.Context,
endpoint adk.InvokableToolCallEndpoint,
_ *adk.ToolContext,
) (adk.InvokableToolCallEndpoint, error) {
return func(ctx context.Context, args string, opts ...tool.Option) (string, error) {
result, err := endpoint(ctx, args, opts...)
if err != nil {
// 将错误转换为字符串,而不是返回错误
return fmt.Sprintf("[tool error] %v", err), nil
}
return result, nil
}, nil
}
效果:
[tool call] read_file(file_path: "nonexistent.txt")
[tool result] [tool error] open nonexistent.txt: no such file or directory
[assistant] 抱歉,文件不存在,请检查文件路径...
// 对话继续,模型可以根据错误信息调整策略
ModelRetryConfig
ModelRetryConfig 配置 ChatModel 的自动重试:
type ModelRetryConfig struct {
MaxRetries int // 最大重试次数
IsRetryAble func(ctx context.Context, err error) bool // 判断是否可重试
}
使用方式(以 DeepAgent 为例):
agent, err := deep.New(ctx, &deep.Config{
// ...
ModelRetryConfig: &adk.ModelRetryConfig{
MaxRetries: 5,
IsRetryAble: func(_ context.Context, err error) bool {
// 429 限流错误可重试
return strings.Contains(err.Error(), "429") ||
strings.Contains(err.Error(), "Too Many Requests") ||
strings.Contains(err.Error(), "qpm limit")
},
},
})
重试策略:
- 指数退避:每次重试间隔递增
- 可配置条件:通过
IsRetryAble判断哪些错误可重试 - 自动恢复:无需用户干预
Middleware 的实现
1. 实现 SafeToolMiddleware
type safeToolMiddleware struct {
*adk.BaseChatModelAgentMiddleware
}
func (m *safeToolMiddleware) WrapInvokableToolCall(
_ context.Context,
endpoint adk.InvokableToolCallEndpoint,
_ *adk.ToolContext,
) (adk.InvokableToolCallEndpoint, error) {
return func(ctx context.Context, args string, opts ...tool.Option) (string, error) {
result, err := endpoint(ctx, args, opts...)
if err != nil {
// 中断错误不转换,需要继续传播
if _, ok := compose.IsInterruptRerunError(err); ok {
return "", err
}
// 其他错误转换为字符串
return fmt.Sprintf("[tool error] %v", err), nil
}
return result, nil
}, nil
}
2. 实现流式 Tool 错误处理
func (m *safeToolMiddleware) WrapStreamableToolCall(
_ context.Context,
endpoint adk.StreamableToolCallEndpoint,
_ *adk.ToolContext,
) (adk.StreamableToolCallEndpoint, error) {
return func(ctx context.Context, args string, opts ...tool.Option) (*schema.StreamReader[string], error) {
sr, err := endpoint(ctx, args, opts...)
if err != nil {
if _, ok := compose.IsInterruptRerunError(err); ok {
return nil, err
}
// 返回包含错误信息的单帧流
return singleChunkReader(fmt.Sprintf("[tool error] %v", err)), nil
}
// 包装流,捕获流中的错误
return safeWrapReader(sr), nil
}, nil
}
3. 配置 Agent 使用 Middleware
本章继续使用第四章引入的 DeepAgent,在其 Handlers 字段中注册 Middleware:
agent, err := deep.New(ctx, &deep.Config{
Name: "Ch05MiddlewareAgent",
Description: "ChatWithDoc agent with safe tool middleware and retry.",
ChatModel: cm,
Instruction: agentInstruction,
Backend: backend,
StreamingShell: backend,
MaxIteration: 50,
Handlers: []adk.ChatModelAgentMiddleware{
&safeToolMiddleware{}, // 将 Tool 错误转换为字符串
},
ModelRetryConfig: &adk.ModelRetryConfig{
MaxRetries: 5,
IsRetryAble: func(_ context.Context, err error) bool {
return strings.Contains(err.Error(), "429") ||
strings.Contains(err.Error(), "Too Many Requests")
},
},
})
注意:Handlers 字段(在配置中)和 “Middleware”(在文档中讨论的概念)是同一回事——Handlers 是配置字段名,而 ChatModelAgentMiddleware 是接口名。
**关键代码片段(注意:这是简化后的代码片段,不能直接运行,完整代码请参考 cmd/ch05/main.go):
// SafeToolMiddleware 捕获 Tool 错误并转换为字符串
type safeToolMiddleware struct {
*adk.BaseChatModelAgentMiddleware
}
func (m *safeToolMiddleware) WrapInvokableToolCall(
_ context.Context,
endpoint adk.InvokableToolCallEndpoint,
_ *adk.ToolContext,
) (adk.InvokableToolCallEndpoint, error) {
return func(ctx context.Context, args string, opts ...tool.Option) (string, error) {
result, err := endpoint(ctx, args, opts...)
if err != nil {
if _, ok := compose.IsInterruptRerunError(err); ok {
return "", err
}
return fmt.Sprintf("[tool error] %v", err), nil
}
return result, nil
}, nil
}
// 配置 DeepAgent(与第四章一样,新增 Handlers 和 ModelRetryConfig)
agent, _ := deep.New(ctx, &deep.Config{
ChatModel: cm,
Backend: backend,
StreamingShell: backend,
MaxIteration: 50,
Handlers: []adk.ChatModelAgentMiddleware{
&safeToolMiddleware{},
},
ModelRetryConfig: &adk.ModelRetryConfig{
MaxRetries: 5,
IsRetryAble: func(_ context.Context, err error) bool {
return strings.Contains(err.Error(), "429")
},
},
})
Middleware 执行流程
┌─────────────────────────────────────────┐
│ 用户:读取不存在的文件 │
└─────────────────────────────────────────┘
↓
┌──────────────────────┐
│ Agent 分析意图 │
│ 决定调用 read_file │
└──────────────────────┘
↓
┌──────────────────────┐
│ SafeToolMiddleware │
│ 拦截 Tool 调用 │
└──────────────────────┘
↓
┌──────────────────────┐
│ 执行 read_file │
│ 返回错误 │
└──────────────────────┘
↓
┌──────────────────────┐
│ SafeToolMiddleware │
│ 将错误转换为字符串 │
└──────────────────────┘
↓
┌──────────────────────┐
│ 返回 Tool Result │
│ "[tool error] ..." │
└──────────────────────┘
↓
┌──────────────────────┐
│ Agent 生成回复 │
│ "抱歉,文件不存在..." │
└──────────────────────┘
本章小结
- Middleware:Agent 的拦截器,可以在调用前后插入自定义逻辑
- SafeToolMiddleware:将 Tool 错误转换为字符串,让模型能够理解并处理
- ModelRetryConfig:配置 ChatModel 的自动重试,处理限流等临时错误
- 装饰器模式:Middleware 包装原始调用,可以修改输入、输出或错误
- 洋葱模型:请求从外向内穿过 Middleware,响应从内向外返回
扩展思考
Eino 内置 Middleware:
| Middleware | 功能说明 |
| reduction | 工具输出缩减,当工具返回内容过长时自动截断并卸载到文件系统,防止上下文溢出 |
| summarization | 对话历史自动摘要,当 token 数量超过阈值时自动生成摘要压缩历史 |
| skill | 技能加载中间件,让 Agent 能够动态加载和执行预定义的技能 |
Middleware 链示例:
import (
"github.com/cloudwego/eino/adk/middlewares/reduction"
"github.com/cloudwego/eino/adk/middlewares/summarization"
"github.com/cloudwego/eino/adk/middlewares/skill"
)
// 创建 reduction middleware:管理工具输出长度
reductionMW, _ := reduction.New(ctx, &reduction.Config{
Backend: filesystemBackend, // 存储后端
MaxLengthForTrunc: 50000, // 单次工具输出最大长度
MaxTokensForClear: 30000, // 触发清理的 token 阈值
})
// 创建 summarization middleware:自动压缩对话历史
summarizationMW, _ := summarization.New(ctx, &summarization.Config{
Model: chatModel, // 用于生成摘要的模型
Trigger: &summarization.TriggerCondition{
ContextTokens: 190000, // 触发摘要的 token 阈值
},
})
// 组合多个 middleware(概念示例,使用 DeepAgent 时将 adk.NewChatModelAgent 替换为 deep.New)
agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Handlers: []adk.ChatModelAgentMiddleware{ // 注意:配置字段名为 Handlers,概念上与 Middlewares 等价
summarizationMW, // 最外层:对话历史摘要
reductionMW, // 中间层:工具输出缩减
},
})