第七章:Interrupt/Resume(中断与恢复)

本章目标:理解 Interrupt/Resume 机制,实现 Tool 审批流程,让用户在敏感操作前进行确认。

代码位置

前置条件

与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark)。同时,需要与第四章一样设置 PROJECT_ROOT

export PROJECT_ROOT=/path/to/eino  # Eino 核心库根目录(不设置则默认使用当前目录)

运行

examples/quickstart/chatwitheino 目录下执行:

# 设置项目根目录
export PROJECT_ROOT=/path/to/your/project

go run ./cmd/ch07

输出示例:

you> 请执行命令 echo hello

⚠️  Approval Required ⚠️
Tool: execute
Arguments: {"command":"echo hello"}

Approve this action? (y/n): y
[tool result] hello

hello

从自动执行到人工审批:为什么需要 Interrupt

前几章我们实现的 Agent 会自动执行所有 Tool 调用,但在某些场景下这是危险的:

自动执行的风险:

  • 删除文件:误删重要数据
  • 发送邮件:发送错误内容
  • 执行命令:执行危险操作
  • 修改配置:破坏系统设置

Interrupt 的定位:

  • Interrupt 是 Agent 的暂停机制:在关键操作前暂停,等待用户确认
  • Interrupt 可携带信息:向用户展示即将执行的操作
  • Interrupt 可恢复:用户确认后继续执行,拒绝后返回错误

简单类比:

  • 自动执行 = “自动驾驶”(完全信任系统)
  • Interrupt = “人工接管”(关键决策由人来做)

关键概念

Interrupt 机制

Interrupt 是 Eino 中实现人机协作的核心机制。

核心思想:在执行关键操作前暂停,等待用户确认后继续。

一个需要审批的 Tool 的执行被分成两个阶段

  1. 第一次调用(触发中断):Tool 保存当前参数,然后返回一个中断信号。Runner 暂停执行,向调用侧返回 Interrupt 事件。
  2. 用户审批后恢复(Resume):Runner 重新调用 Tool,此时 Tool 检测到"已中断过",直接读取用户的审批结果并执行(或拒绝)。

简化版伪代码:

func myTool(ctx, args):
    if 第一次调用:
        保存 args
        return 中断信号  // Runner 暂停,展示审批提示
    else:  // Resume 后的第二次调用
        if 用户批准:
            return 执行操作(保存的 args)
        else:
            return "操作被用户拒绝"

完整代码及关键字段说明:

// 在 Tool 中触发中断
func myTool(ctx context.Context, args string) (string, error) {
    // wasInterrupted: 是否是 Resume 后的第二次调用(第一次为 false,Resume 后为 true)
    // storedArgs: 第一次调用时通过 StatefulInterrupt 保存的参数,Resume 后可取回
    wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)

    if !wasInterrupted {
        // 第一次调用:触发中断,同时保存 args 供 Resume 后使用
        return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
            ToolName:        "my_tool",
            ArgumentsInJSON: args,
        }, args)  // 第三个参数是要保存的状态(Resume 后通过 storedArgs 取回)
    }

    // Resume 后的第二次调用:读取用户审批结果
    // isTarget: 本次 Resume 是否针对当前 Tool(一次 Resume 只针对一个 Tool)
    // hasData:  Resume 时是否携带了审批结果数据
    // data:     用户传入的审批结果
    isTarget, hasData, data := tool.GetResumeContext[*ApprovalResult](ctx)
    if isTarget && hasData {
        if data.Approved {
            return doSomething(storedArgs)  // 使用保存的参数执行实际操作
        }
        return "Operation rejected by user", nil
    }

    // 其他情况(isTarget=false 意味着本次 Resume 目标不是当前 Tool):重新中断
    return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
        ToolName:        "my_tool",
        ArgumentsInJSON: storedArgs,
    }, storedArgs)
}

ApprovalMiddleware

ApprovalMiddleware 是一个通用的审批中间件,可以拦截特定 Tool 的调用:

type approvalMiddleware struct {
    *adk.BaseChatModelAgentMiddleware
}

func (m *approvalMiddleware) WrapInvokableToolCall(
    _ context.Context,
    endpoint adk.InvokableToolCallEndpoint,
    tCtx *adk.ToolContext,
) (adk.InvokableToolCallEndpoint, error) {
    // 只拦截需要审批的 Tool
    if tCtx.Name != "execute" {
        return endpoint, nil
    }
    
    return func(ctx context.Context, args string, opts ...tool.Option) (string, error) {
        wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
        
        if !wasInterrupted {
            return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
                ToolName:        tCtx.Name,
                ArgumentsInJSON: args,
            }, args)
        }
        
        isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
        if isTarget && hasData {
            if data.Approved {
                return endpoint(ctx, storedArgs, opts...)
            }
            if data.DisapproveReason != nil {
                return fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason), nil
            }
            return fmt.Sprintf("tool '%s' disapproved", tCtx.Name), nil
        }
        
        // 重新中断
        return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
            ToolName:        tCtx.Name,
            ArgumentsInJSON: storedArgs,
        }, storedArgs)
    }, nil
}

func (m *approvalMiddleware) WrapStreamableToolCall(
    _ context.Context,
    endpoint adk.StreamableToolCallEndpoint,
    tCtx *adk.ToolContext,
) (adk.StreamableToolCallEndpoint, error) {
    // 如果 agent 配置了 StreamingShell,则 execute 会走流式调用,需要实现该方法才能拦截到
    if tCtx.Name != "execute" {
        return endpoint, nil
    }
    return func(ctx context.Context, args string, opts ...tool.Option) (*schema.StreamReader[string], error) {
        wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
        if !wasInterrupted {
            return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
                ToolName:        tCtx.Name,
                ArgumentsInJSON: args,
            }, args)
        }

        isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
        if isTarget && hasData {
            if data.Approved {
                return endpoint(ctx, storedArgs, opts...)
            }
            if data.DisapproveReason != nil {
                return singleChunkReader(fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason)), nil
            }
            return singleChunkReader(fmt.Sprintf("tool '%s' disapproved", tCtx.Name)), nil
        }

        isTarget, _, _ = tool.GetResumeContext[any](ctx)
        if !isTarget {
            return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
                ToolName:        tCtx.Name,
                ArgumentsInJSON: storedArgs,
            }, storedArgs)
        }

        return endpoint(ctx, storedArgs, opts...)
    }, nil
}

CheckPointStore

CheckPointStore 是实现中断恢复的关键组件:

type CheckPointStore interface {
    // 保存检查点
    Put(ctx context.Context, key string, checkpoint *Checkpoint) error
    
    // 获取检查点
    Get(ctx context.Context, key string) (*Checkpoint, error)
}

为什么需要 CheckPointStore?

  • 中断时保存状态:Tool 参数、执行位置等
  • 恢复时加载状态:从中断点继续执行
  • 支持跨进程恢复:进程重启后仍可恢复

Interrupt/Resume 的实现

1. 配置 Runner 使用 CheckPointStore

runner := adk.NewRunner(ctx, adk.RunnerConfig{
    Agent:           agent,
    EnableStreaming: true,
    CheckPointStore: adkstore.NewInMemoryStore(),  // 内存存储
})

2. 配置 Agent 使用 ApprovalMiddleware

agent, err := deep.New(ctx, &deep.Config{
    // ... 其他配置
    Handlers: []adk.ChatModelAgentMiddleware{
        &approvalMiddleware{},  // 添加审批中间件
        &safeToolMiddleware{},  // 将 Tool 错误转换为字符串(中断类错误会继续向上抛出)
    },
})

3. 处理中断事件

checkPointID := sessionID

events := runner.Run(ctx, history, adk.WithCheckPointID(checkPointID))
content, interruptInfo, err := printAndCollectAssistantFromEvents(events)
if err != nil {
    return err
}

if interruptInfo != nil {
    // 注意:建议使用同一个 stdin reader 同时读取「用户输入」与「审批 y/n」
    // 避免审批输入被当成下一轮 you> 的消息
    content, err = handleInterrupt(ctx, runner, checkPointID, interruptInfo, reader)
    if err != nil {
        return err
    }
}

_ = session.Append(schema.AssistantMessage(content, nil))

Interrupt/Resume 执行流程

┌─────────────────────────────────────────┐
│  用户:执行命令 echo hello               │
└─────────────────────────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  Agent 分析意图       │
        │  决定调用 execute     │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  ApprovalMiddleware  │
        │  拦截 Tool 调用       │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  触发 Interrupt       │
        │  保存状态到 Store     │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  返回 Interrupt 事件  │
        │  等待用户审批         │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  用户输入 y/n         │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  runner.ResumeWith... │
        │  恢复执行             │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  执行 execute         │
        │  或返回拒绝信息       │
        └──────────────────────┘

本章小结

  • Interrupt:Agent 的暂停机制,在关键操作前暂停等待确认
  • Resume:恢复执行,用户确认后继续或拒绝后返回错误
  • ApprovalMiddleware:通用审批中间件,拦截特定 Tool 调用
  • CheckPointStore:保存中断状态,支持跨进程恢复
  • 人机协作:关键决策由人来确认,提高安全性

扩展思考

其他 Interrupt 场景:

  • 多选项审批:用户选择多个选项之一
  • 参数补全:用户提供缺失的参数
  • 条件分支:用户决定执行路径

审批策略:

  • 白名单:只审批敏感操作
  • 黑名单:审批所有操作,除了安全的
  • 动态规则:根据参数内容决定是否审批