第八章:Graph Tool(复杂工作流)

本章目标:理解 Graph Tool 的概念,实现大文件的并行 chunk 召回,引入 compose 包构建复杂工作流。

代码位置

前置条件

与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark)。

运行

examples/quickstart/chatwitheino 目录下执行:

# 设置项目根目录
export PROJECT_ROOT=/path/to/your/project

go run ./cmd/ch08

输出示例:

you> 请帮我分析 RFC6455 文档中关于 WebSocket 握手的部分
[assistant] 我来帮你分析文档...
[tool call] answer_from_document(file_path: "rfc6455.txt", question: "WebSocket 握手过程")
[tool result] 找到 3 个相关片段,正在生成答案...
[assistant] 根据 RFC6455 文档,WebSocket 握手过程如下...

从简单 Tool 到 Graph Tool:为什么需要复杂工作流

第四章我们创建了简单的 Tool,每个 Tool 执行单一任务。但实际场景中,很多任务需要多个步骤协同完成。

简单 Tool 的局限:

  • 单一职责:每个 Tool 只做一件事
  • 无法并行:多个独立任务无法同时执行
  • 难以复用:复杂逻辑难以拆分和组合

重要说明:本章只是展示 compose/graph/workflow 能力的一角。

从更大的视角看,Eino 的 compose 包提供了非常通用、确定性的编排能力:你可以把任何需要"确定性业务流程"的系统,用 compose 的 Graph/Chain/Workflow 组织成可执行的流水线,并且它能够原生编排 Eino 的所有 component(如 ChatModel、Prompt、Tools、Retriever、Embedding、Indexer 等),同时具备完整的 callback 体系,以及 interrupt/resume + checkpoint 支持。

Graph Tool 的定位:

  • Graph Tool 是 compose 工作流的 Tool 化封装:把 compose.Graph / compose.Chain / compose.Workflow 这类可编译的编排产物,包装成一个 Agent 可调用的 Tool
  • 支持并行/分支/组合:由 compose 提供(并行、分支、字段映射、子图等),Graph Tool 只是把它们暴露为 Tool 入口
  • 支持状态管理与持久化:节点间传递数据、以及通过 checkpoint 保存/恢复运行状态
  • 可中断恢复:既支持工作流内部的中断(节点里触发 interrupt),也支持工具层面的中断包装(嵌套 interrupt 场景)

简单类比:

  • 简单 Tool = “单步操作”(读取文件)
  • Graph Tool = “流水线”(读取 → 分块 → 评分 → 筛选 → 生成答案)

关键概念

compose.Workflow

compose.Workflow 是 Eino 中构建工作流的核心组件:

wf := compose.NewWorkflow[Input, Output]()

// 添加节点
wf.AddLambdaNode("load", loadFunc).AddInput(compose.START)
wf.AddLambdaNode("chunk", chunkFunc).AddInput("load")
wf.AddLambdaNode("score", scoreFunc).AddInput("chunk")
wf.AddLambdaNode("answer", answerFunc).AddInput("score")

// 连接到结束节点
wf.End().AddInput("answer")

核心概念:

  • Node:工作流中的处理单元
  • Edge:节点间的数据流向
  • START:工作流的入口
  • END:工作流的出口

BatchNode

BatchNode 用于并行处理多个任务:

scorer := batch.NewBatchNode(&batch.NodeConfig[Task, Result]{
    Name:           "ChunkScorer",
    InnerTask:      scoreOneChunk,  // 单个任务的处理函数
    MaxConcurrency: 5,              // 最大并发数
})

工作原理:

  1. 接收任务列表作为输入
  2. 并行执行每个任务(受 MaxConcurrency 限制)
  3. 收集所有结果返回

FieldMapping

FieldMapping 用于跨节点传递数据:

wf.AddLambdaNode("answer", answerFunc).
    AddInputWithOptions("filter",  // 从 filter 节点获取数据
        []*compose.FieldMapping{compose.ToField("TopK")},
        compose.WithNoDirectDependency()).
    AddInputWithOptions(compose.START,  // 从 START 节点获取数据
        []*compose.FieldMapping{compose.MapFields("Question", "Question")},
        compose.WithNoDirectDependency())

为什么需要 FieldMapping?

  • 非相邻节点间传递数据
  • 多个数据源合并到同一节点
  • 数据字段重命名

Graph Tool 的实现

1. 定义输入输出结构

type Input struct {
    FilePath string `json:"file_path" jsonschema:"description=Absolute path to the document"`
    Question string `json:"question"  jsonschema:"description=The question to answer"`
}

type Output struct {
    Answer  string   `json:"answer"`
    Sources []string `json:"sources"`
}

2. 构建工作流

func buildWorkflow(cm model.BaseChatModel) *compose.Workflow[Input, Output] {
    wf := compose.NewWorkflow[Input, Output]()

    // load: 读取文件
    wf.AddLambdaNode("load", compose.InvokableLambda(
        func(ctx context.Context, in Input) ([]*schema.Document, error) {
            data, err := os.ReadFile(in.FilePath)
            if err != nil {
                return nil, err
            }
            return []*schema.Document{{Content: string(data)}}, nil
        },
    )).AddInput(compose.START)

    // chunk: 分块
    wf.AddLambdaNode("chunk", compose.InvokableLambda(
        func(ctx context.Context, docs []*schema.Document) ([]*schema.Document, error) {
            var out []*schema.Document
            for _, d := range docs {
                out = append(out, splitIntoChunks(d.Content, 800)...)
            }
            return out, nil
        },
    )).AddInput("load")

    // score: 并行评分
    scorer := batch.NewBatchNode(&batch.NodeConfig[scoreTask, scoredChunk]{
        Name:           "ChunkScorer",
        InnerTask:      newScoreWorkflow(cm),
        MaxConcurrency: 5,
    })

    wf.AddLambdaNode("score", compose.InvokableLambda(
        func(ctx context.Context, in scoreIn) ([]scoredChunk, error) {
            tasks := make([]scoreTask, len(in.Chunks))
            for i, c := range in.Chunks {
                tasks[i] = scoreTask{Text: c.Content, Question: in.Question}
            }
            return scorer.Invoke(ctx, tasks)
        },
    )).
        AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()).
        AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())

    // filter: 筛选 top-k
    wf.AddLambdaNode("filter", compose.InvokableLambda(
        func(ctx context.Context, scored []scoredChunk) ([]scoredChunk, error) {
            sort.Slice(scored, func(i, j int) bool {
                return scored[i].Score > scored[j].Score
            })
            // 返回 top-3
            if len(scored) > 3 {
                scored = scored[:3]
            }
            return scored, nil
        },
    )).AddInput("score")

    // answer: 生成答案
    wf.AddLambdaNode("answer", compose.InvokableLambda(
        func(ctx context.Context, in synthIn) (Output, error) {
            return synthesize(ctx, cm, in)
        },
    )).
        AddInputWithOptions("filter", []*compose.FieldMapping{compose.ToField("TopK")}, compose.WithNoDirectDependency()).
        AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())

    wf.End().AddInput("answer")

    return wf
}

3. 封装为 Tool

func BuildTool(ctx context.Context, cm model.BaseChatModel) (tool.BaseTool, error) {
    wf := buildWorkflow(cm)
    return graphtool.NewInvokableGraphTool[Input, Output](
        wf,
        "answer_from_document",
        "Search a large document for relevant content and synthesize an answer.",
    )
}

**关键代码片段(注意:这是简化后的代码片段,不能直接运行,完整代码请参考 rag/rag.go):

// 构建工作流
wf := compose.NewWorkflow[Input, Output]()

// 添加节点
wf.AddLambdaNode("load", loadFunc).AddInput(compose.START)
wf.AddLambdaNode("chunk", chunkFunc).AddInput("load")
wf.AddLambdaNode("score", scoreFunc).
    AddInputWithOptions("chunk", []*compose.FieldMapping{compose.ToField("Chunks")}, compose.WithNoDirectDependency()).
    AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Question", "Question")}, compose.WithNoDirectDependency())

// 封装为 Tool
return graphtool.NewInvokableGraphTool[Input, Output](wf, "answer_from_document", "...")

Graph Tool 执行流程

┌─────────────────────────────────────────┐
│  输入:file_path, question               │
└─────────────────────────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  load: 读取文件       │
        │  输出: []*Document    │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  chunk: 分块          │
        │  输出: []*Document    │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  score: 并行评分      │
        │  (MaxConcurrency=5)  │
        │  输出: []scoredChunk  │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  filter: 筛选 top-k   │
        │  输出: []scoredChunk  │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  answer: 生成答案     │
        │  输出: Output         │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  返回结果             │
        │  {answer, sources}    │
        └──────────────────────┘

本章小结

  • Graph Tool:将复杂工作流封装为 Tool,支持多步骤协同
  • compose.Workflow:构建工作流的核心组件
  • BatchNode:并行处理多个任务
  • FieldMapping:跨节点传递数据
  • 可中断恢复:Graph Tool 支持 Checkpoint 机制

扩展思考

其他 Graph Tool 应用:

  • 多文档 RAG:并行处理多个文档
  • 多模型协作:不同模型处理不同任务
  • 复杂决策树:根据条件选择不同分支

性能优化:

  • 调整 MaxConcurrency 控制并发
  • 使用缓存避免重复计算
  • 流式输出提升用户体验