第六章:Callback 与 Trace(可观测性)

本章目标:理解 Callback 机制,集成 CozeLoop 实现链路追踪和可观测性。

代码位置

前置条件

与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark)。同时,需要与第四章一样设置 PROJECT_ROOT

export PROJECT_ROOT=/path/to/eino  # Eino 核心库根目录(不设置则默认使用当前目录)

可选:配置 CozeLoop 实现链路追踪:

export COZELOOP_WORKSPACE_ID=your_workspace_id
export COZELOOP_API_TOKEN=your_token

运行

examples/quickstart/chatwitheino 目录下执行:

# 设置项目根目录
export PROJECT_ROOT=/path/to/your/project

# 可选:配置 CozeLoop
export COZELOOP_WORKSPACE_ID=your_workspace_id
export COZELOOP_API_TOKEN=your_token

go run ./cmd/ch06

输出示例:

[trace] starting session: 083d16da-6b13-4fe6-afb0-c45d8f490ce1
you> 你好
[trace] chat_model_generate: model=gpt-4.1-mini tokens=150
[trace] tool_call: name=list_files duration=23ms
[assistant] 你好!有什么我可以帮助你的吗?

从黑盒到白盒:为什么需要 Callback

前几章我们实现的 Agent 是一个"黑盒":输入问题,输出答案,但中间发生了什么我们并不清楚。

黑盒的问题:

  • 不知道模型调用了多少次
  • 不知道 Tool 执行了多长时间
  • 不知道 Token 消耗了多少
  • 出问题时难以定位原因

Callback 的定位:

  • Callback 是 Eino 的旁路机制:从 component 到 compose(下文详谈)到 adk,一以贯之
  • Callback 在固定点位触发:组件生命周期的 5 个关键时机
  • Callback 可抽取实时信息:输入、输出、错误、流式数据等
  • Callback 用途广泛:观测、日志、指标、追踪、调试、审计等

简单类比:

  • Agent = “业务逻辑”(主路)
  • Callback = “旁路钩子”(在固定点位抽取信息)

关键概念

Handler 接口

Handler 是 Eino 中定义回调处理器的核心接口:

type Handler interface {
    // 非流式输入(组件开始处理前)
    OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
    
    // 非流式输出(组件成功返回后)
    OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
    
    // 错误(组件返回错误时)
    OnError(ctx context.Context, info *RunInfo, err error) context.Context
    
    // 流式输入(组件接收流式输入时)
    OnStartWithStreamInput(ctx context.Context, info *RunInfo, 
        input *schema.StreamReader[CallbackInput]) context.Context
    
    // 流式输出(组件返回流式输出时)
    OnEndWithStreamOutput(ctx context.Context, info *RunInfo, 
        output *schema.StreamReader[CallbackOutput]) context.Context
}

设计理念:

  • 旁路机制:不干扰主流程,在固定点位抽取信息
  • 全流程覆盖:从 component 到 compose 到 adk,所有组件都支持
  • 状态传递:同一 Handler 的 OnStart→OnEnd 可通过 context 传递状态
  • 性能优化:实现 TimingChecker 接口可跳过不需要的时机

RunInfo 结构:

type RunInfo struct {
    Name      string        // 业务名称(节点名或用户指定)
    Type      string        // 实现类型(如 "OpenAI")
    Component string        // 组件类型(如 "ChatModel")
}

重要提示:

  • 流式回调必须关闭 StreamReader,否则会导致 goroutine 泄漏
  • 不要修改 Input/Output,它们被所有下游共享
  • RunInfo 可能为 nil,使用前需要检查

CozeLoop

CozeLoop 是字节跳动开源的 AI 应用可观测性平台,提供了:

  • 链路追踪:完整的调用链路可视化
  • 指标监控:延迟、Token 消耗、错误率等
  • 日志聚合:集中管理所有日志
  • 调试支持:在线查看和调试

集成方式:

import (
    clc "github.com/cloudwego/eino-ext/callbacks/cozeloop"
    "github.com/cloudwego/eino/callbacks"
    "github.com/coze-dev/cozeloop-go"
)

// 创建 CozeLoop 客户端
client, err := cozeloop.NewClient(
    cozeloop.WithAPIToken(apiToken),
    cozeloop.WithWorkspaceID(workspaceID),
)

// 注册为全局 Callback
callbacks.AppendGlobalHandlers(clc.NewLoopHandler(client))

Callback 的触发时机

Callback 在组件生命周期的 5 个关键时机触发。下表中 Timing* 是 Eino 内部常量名(用于 TimingChecker 接口),对应的 Handler 接口方法是右侧所示:

时机常量对应 Handler 方法触发点输入/输出
TimingOnStart
OnStart
组件开始处理前CallbackInput
TimingOnEnd
OnEnd
组件成功返回后CallbackOutput
TimingOnError
OnError
组件返回错误时error
TimingOnStartWithStreamInput
OnStartWithStreamInput
组件接收流式输入时StreamReader[CallbackInput]
TimingOnEndWithStreamOutput
OnEndWithStreamOutput
组件返回流式输出时StreamReader[CallbackOutput]

示例:ChatModel 调用流程

┌─────────────────────────────────────────┐
│  ChatModel.Generate(ctx, messages)      │
└─────────────────────────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  OnStart             │  ← 输入: CallbackInput (messages)
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  模型处理             │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  OnEnd               │  ← 输出: CallbackOutput (response)
        └──────────────────────┘

示例:流式输出流程

┌─────────────────────────────────────────┐
│  ChatModel.Stream(ctx, messages)        │
└─────────────────────────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  OnStart             │  ← 输入: CallbackInput (messages)
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  模型处理(流式)     │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  OnEndWithStreamOutput │  ← 输出: StreamReader[CallbackOutput]
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  逐个 chunk 返回      │
        └──────────────────────┘

注意:

  • 流式错误(stream 中途出错)不会触发 OnError,而是在 StreamReader 中返回
  • 同一 Handler 的 OnStart→OnEnd 可通过 context 传递状态
  • 不同 Handler 之间没有执行顺序保证

Callback 的实现

1. 实现自定义 Callback Handler

完整实现 Handler 接口需要实现所有 5 个方法,较为繁琐。Eino 提供了 callbacks.HandlerHelper 帮助类来简化实现:

import "github.com/cloudwego/eino/callbacks"

// 使用 NewHandlerHelper 注册感兴趣的回调
handler := callbacks.NewHandlerHelper().
    OnStart(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
        log.Printf("[trace] %s/%s start", info.Component, info.Name)
        return ctx
    }).
    OnEnd(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
        log.Printf("[trace] %s/%s end", info.Component, info.Name)
        return ctx
    }).
    OnError(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
        log.Printf("[trace] %s/%s error: %v", info.Component, info.Name, err)
        return ctx
    }).
    Handler()

// 注册为全局 Callback
callbacks.AppendGlobalHandlers(handler)

注意RunInfo 可能为 nil(如顶层调用没有 RunInfo),使用前请检查。

2. 集成 CozeLoop

func setupCozeLoop(ctx context.Context) (*cozeloop.Client, error) {
    apiToken := os.Getenv("COZELOOP_API_TOKEN")
    workspaceID := os.Getenv("COZELOOP_WORKSPACE_ID")
    
    if apiToken == "" || workspaceID == "" {
        return nil, nil  // 未配置则跳过
    }
    
    client, err := cozeloop.NewClient(
        cozeloop.WithAPIToken(apiToken),
        cozeloop.WithWorkspaceID(workspaceID),
    )
    if err != nil {
        return nil, err
    }
    
    // 注册为全局 Callback
    callbacks.AppendGlobalHandlers(clc.NewLoopHandler(client))
    
    return client, nil
}

3. 在 main 中使用

func main() {
    ctx := context.Background()
    
    // 设置 CozeLoop(可选)
    client, err := setupCozeLoop(ctx)
    if err != nil {
        log.Printf("cozeloop setup failed: %v", err)
    }
    if client != nil {
        defer func() {
            time.Sleep(5 * time.Second)  // 等待数据上报
            client.Close(ctx)
        }()
    }
    
    // 创建 Agent 并运行...
}

**关键代码片段(注意:这是简化后的代码片段,不能直接运行,完整代码请参考 cmd/ch06/main.go):

// 设置 CozeLoop 追踪
cozeloopApiToken := os.Getenv("COZELOOP_API_TOKEN")
cozeloopWorkspaceID := os.Getenv("COZELOOP_WORKSPACE_ID")
if cozeloopApiToken != "" && cozeloopWorkspaceID != "" {
    client, err := cozeloop.NewClient(
        cozeloop.WithAPIToken(cozeloopApiToken),
        cozeloop.WithWorkspaceID(cozeloopWorkspaceID),
    )
    if err != nil {
        log.Fatalf("cozeloop.NewClient failed: %v", err)
    }
    defer func() {
        time.Sleep(5 * time.Second)
        client.Close(ctx)
    }()
    callbacks.AppendGlobalHandlers(clc.NewLoopHandler(client))
}

可观测性的价值

1. 性能分析

通过 Callback 收集的数据,可以分析:

  • 模型调用延迟分布
  • Tool 执行时间排行
  • Token 消耗趋势

2. 错误追踪

当 Agent 出现问题时:

  • 查看完整的调用链路
  • 定位是哪个环节出错
  • 分析错误原因

3. 成本优化

通过 Token 消耗数据:

  • 识别高消耗的对话
  • 优化 Prompt 减少 Token
  • 选择更经济的模型

本章小结

  • Callback:Eino 的观测钩子,在关键节点触发回调
  • CozeLoop:字节跳动的 AI 应用可观测性平台
  • 全局注册:通过 callbacks.AppendGlobalHandlers 注册全局 Callback
  • 非侵入式:业务代码不需要修改,Callback 自动触发
  • 可观测性价值:性能分析、错误追踪、成本优化

扩展思考

其他 Callback 实现:

  • OpenTelemetry Callback:对接标准可观测性协议
  • 自定义日志 Callback:记录到本地文件
  • 指标 Callback:对接 Prometheus 等监控系统

高级用法:

  • 在 Callback 中实现采样(只记录部分请求)
  • 在 Callback 中实现限流(根据 Token 消耗)
  • 在 Callback 中实现告警(错误率过高时通知)