第八章:Graph Tool(复杂工作流)
本章目标:理解 Graph Tool 的概念,实现大文件的并行 chunk 召回,引入 compose 包构建复杂工作流。
代码位置
- 入口代码:cmd/ch08/main.go
- RAG 实现:rag/rag.go
前置条件
与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark)。
运行
在 examples/quickstart/chatwitheino 目录下执行:
# 设置项目根目录
export PROJECT_ROOT=/path/to/your/project
go run ./cmd/ch08
输出示例:
you> 请帮我分析 RFC6455 文档中关于 WebSocket 握手的部分
[assistant] 我来帮你分析文档...
[tool call] answer_from_document(file_path: "rfc6455.txt", question: "WebSocket 握手过程")
[tool result] 找到 3 个相关片段,正在生成答案...
[assistant] 根据 RFC6455 文档,WebSocket 握手过程如下...
从简单 Tool 到 Graph Tool:为什么需要复杂工作流
第四章我们创建了简单的 Tool,每个 Tool 执行单一任务。但实际场景中,很多任务需要多个步骤协同完成。
简单 Tool 的局限:
- 单一职责:每个 Tool 只做一件事
- 无法并行:多个独立任务无法同时执行
- 难以复用:复杂逻辑难以拆分和组合
重要说明:本章只是展示 compose/graph/workflow 能力的一角。
从更大的视角看,Eino 的 compose 包提供了非常通用、确定性的编排能力:你可以把任何需要"确定性业务流程"的系统,用 compose 的 Graph/Chain/Workflow 组织成可执行的流水线,并且它能够原生编排 Eino 的所有 component(如 ChatModel、Prompt、Tools、Retriever、Embedding、Indexer 等),同时具备完整的 callback 体系,以及 interrupt/resume + checkpoint 支持。
Graph Tool 的定位:
- Graph Tool 是 compose 工作流的 Tool 化封装:把
compose.Graph / compose.Chain / compose.Workflow这类可编译的编排产物,包装成一个 Agent 可调用的 Tool - 支持并行/分支/组合:由 compose 提供(并行、分支、字段映射、子图等),Graph Tool 只是把它们暴露为 Tool 入口
- 支持状态管理与持久化:节点间传递数据、以及通过 checkpoint 保存/恢复运行状态
- 可中断恢复:既支持工作流内部的中断(节点里触发 interrupt),也支持工具层面的中断包装(嵌套 interrupt 场景)
简单类比:
- 简单 Tool = “单步操作”(读取文件)
- Graph Tool = “流水线”(读取 → 分块 → 评分 → 筛选 → 生成答案)
关键概念
compose.Workflow
compose.Workflow 是 Eino 中构建工作流的核心组件:
wf := compose.NewWorkflow[Input, Output]()
// 添加节点
wf.AddLambdaNode("load", loadFunc).AddInput(compose.START)
wf.AddLambdaNode("chunk", chunkFunc).AddInput("load")
wf.AddLambdaNode("score", scoreFunc).AddInput("chunk")
wf.AddLambdaNode("answer", answerFunc).AddInput("score")
// 连接到结束节点
wf.End().AddInput("answer")
核心概念:
- Node:工作流中的处理单元
- Edge:节点间的数据流向
- START:工作流的入口
- END:工作流的出口
BatchNode
BatchNode 用于并行处理多个任务:
scorer := batch.NewBatchNode(&batch.NodeConfig[Task, Result]{
Name: "ChunkScorer",
InnerTask: scoreOneChunk, // 单个任务的处理函数
MaxConcurrency: 5, // 最大并发数
})
工作原理:
- 接收任务列表作为输入
- 并行执行每个任务(受 MaxConcurrency 限制)
- 收集所有结果返回
FieldMapping
FieldMapping 用于跨节点传递数据:
wf.AddLambdaNode("answer", answerFunc).
AddInputWithOptions("filter", // 从 filter 节点获取数据
[]*compose.FieldMapping{compose.ToField("TopK")},
compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, // 从 START 节点获取数据
[]*compose.FieldMapping{compose.MapFields("Question", "Question")},
compose.WithNoDirectDependency())
为什么需要 FieldMapping?
- 非相邻节点间传递数据
- 多个数据源合并到同一节点
- 数据字段重命名
Graph Tool 的实现
1. 定义输入输出结构
type Input struct {
FilePath string `json:"file_path" jsonschema:"description=Absolute path to the document"`
Question string `json:"question" jsonschema:"description=The question to answer"`
}
type Output struct {
Answer string `json:"answer"`
Sources []string `json:"sources"`
}
2. 构建工作流
func buildWorkflow(cm model.BaseChatModel) *compose.Workflow[Input, Output] {
wf := compose.NewWorkflow[Input, Output]()
// load: 读取文件
wf.AddLambdaNode("load", compose.InvokableLambda(
func(ctx context.Context, in Input) ([]*schema.Document, error) {
data, err := os.ReadFile(in.FilePath)
if err != nil {
return nil, err
}
return []*schema.Document{{Content: string(data)}}, nil
},
)).AddInput(compose.START)
// chunk: 分块
wf.AddLambdaNode("chunk", compose.InvokableLambda(
func(ctx context.Context, docs []*schema.Document) ([]*schema.Document, error) {
var out []*schema.Document
for _, d := range docs {
out = append(out, splitIntoChunks(d.Content, 800)...)
}
return out, nil
},
)).AddInput("load")
// score: 并行评分
scorer := batch.NewBatchNode(&batch.NodeConfig[scoreTask, scoredChunk]{
Name: "ChunkScorer",
InnerTask: newScoreWorkflow(cm),
MaxConcurrency: 5,
})
wf.AddLambdaNode("score", compose.InvokableLambda(
func(ctx context.Context, in scoreIn) ([]scoredChunk, error) {
tasks := make([]scoreTask, len(in.Chunks))
for i, c := range in.Chunks {
tasks[i] = scoreTask{Text: c.Content, Question: in.Question}
}
return scorer.Invoke(ctx, tasks)
},
)).
AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())
// filter: 筛选 top-k
wf.AddLambdaNode("filter", compose.InvokableLambda(
func(ctx context.Context, scored []scoredChunk) ([]scoredChunk, error) {
sort.Slice(scored, func(i, j int) bool {
return scored[i].Score > scored[j].Score
})
// 返回 top-3
if len(scored) > 3 {
scored = scored[:3]
}
return scored, nil
},
)).AddInput("score")
// answer: 生成答案
wf.AddLambdaNode("answer", compose.InvokableLambda(
func(ctx context.Context, in synthIn) (Output, error) {
return synthesize(ctx, cm, in)
},
)).
AddInputWithOptions("filter", []*compose.FieldMapping{compose.ToField("TopK")}, compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())
wf.End().AddInput("answer")
return wf
}
3. 封装为 Tool
func BuildTool(ctx context.Context, cm model.BaseChatModel) (tool.BaseTool, error) {
wf := buildWorkflow(cm)
return graphtool.NewInvokableGraphTool[Input, Output](
wf,
"answer_from_document",
"Search a large document for relevant content and synthesize an answer.",
)
}
**关键代码片段(注意:这是简化后的代码片段,不能直接运行,完整代码请参考 rag/rag.go):
// 构建工作流
wf := compose.NewWorkflow[Input, Output]()
// 添加节点
wf.AddLambdaNode("load", loadFunc).AddInput(compose.START)
wf.AddLambdaNode("chunk", chunkFunc).AddInput("load")
wf.AddLambdaNode("score", scoreFunc).
AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())
// 封装为 Tool
return graphtool.NewInvokableGraphTool[Input, Output](wf, "answer_from_document", "...")
Graph Tool 执行流程
┌─────────────────────────────────────────┐
│ 输入:file_path, question │
└─────────────────────────────────────────┘
↓
┌──────────────────────┐
│ load: 读取文件 │
│ 输出: []*Document │
└──────────────────────┘
↓
┌──────────────────────┐
│ chunk: 分块 │
│ 输出: []*Document │
└──────────────────────┘
↓
┌──────────────────────┐
│ score: 并行评分 │
│ (MaxConcurrency=5) │
│ 输出: []scoredChunk │
└──────────────────────┘
↓
┌──────────────────────┐
│ filter: 筛选 top-k │
│ 输出: []scoredChunk │
└──────────────────────┘
↓
┌──────────────────────┐
│ answer: 生成答案 │
│ 输出: Output │
└──────────────────────┘
↓
┌──────────────────────┐
│ 返回结果 │
│ {answer, sources} │
└──────────────────────┘
本章小结
- Graph Tool:将复杂工作流封装为 Tool,支持多步骤协同
- compose.Workflow:构建工作流的核心组件
- BatchNode:并行处理多个任务
- FieldMapping:跨节点传递数据
- 可中断恢复:Graph Tool 支持 Checkpoint 机制
扩展思考
其他 Graph Tool 应用:
- 多文档 RAG:并行处理多个文档
- 多模型协作:不同模型处理不同任务
- 复杂决策树:根据条件选择不同分支
性能优化:
- 调整 MaxConcurrency 控制并发
- 使用缓存避免重复计算
- 流式输出提升用户体验