5.7 adk
#
一、什么是ADK
#
Agent:一个独立的、可执行的AI任务单元,通过调用ChatModel的理解能力和预定义Tool的工具执行能力,能够自主学习完成复杂的任务。主要功能:
- 推理:分析数据、识别模式,根据逻辑和可用信息推导出结论
- 行动:执行任务
- 观察:自助收集上下文信息
- 规划:确定必要的步骤,选择最佳行动方法
- 协作:与其他AI Agent/人 进行协作
Eino ADK(Agent Development Kit)****是一个专为 Go 语言设计的 Agent 和 Multi-Agent 开发框架,设计上参考了
Google-ADK 中对 Agent 与协作机制的定义。该工具库提供了统一的抽象接口、灵活的组合模式和强大的协作机制,设计哲学是"简单的事情简单做,复杂的事情也能做",让开发者能够专注于业务逻辑的实现,而不必担心底层的技术复杂性(如跨Agent的context传播、事件流分发和转换、任务控制权转移、中断与恢复、callback通用能力),能像搭建乐高积木一样构建复杂的AI Agent系统:
- 少写胶水:统一接口与事件流,复杂任务拆解更自然。
- 快速编排:预设范式 + 工作流,分分钟搭好管线。
- 更可控:可中断、可恢复、可审计,Agent 协作过程“看得见”。
ADK整体模块构成:

封装关系:

二、预定义组件
#
2.1 ChatModelAgent 带React能力
#
最重要的预构建组件,封装了与大语言模型的交互逻辑,实现了经典的 ReAct(Reason-Act-Observe)模式。ChatModlAgent的行为是 非确定性的,通过LLM来动态决定 call tool/transfer another agent。运行过程为:
- 调用 LLM(Reason)
- LLM 返回工具调用请求(Action)
- ChatModelAgent 执行工具(Act)
- 将工具结果返回给 LLM(Observation),结合之前的上下文**循环(loop)**生成,直到模型判断不需要调用 Tool 后结束。

ReAct 模式的核心是**“思考 → 行动 → 观察 → 再思考”**的闭环,解决传统 Agent “盲目行动”(如一次性搜集全部信息导致的信息过载)或“推理与行动脱节”(如凭空靠直觉决策)的痛点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| // eino/adk/chatmodel.go
type ChatModelAgent struct {
name string
description string
instruction string
model model.ToolCallingChatModel
toolsConfig ToolsConfig
genModelInput GenModelInput
outputKey string
maxIterations int
subAgents []Agent // subAgents
parentAgent Agent
disallowTransferToParent bool
exit tool.BaseTool
// runner
once sync.Once // sync.Once
run runFunc
frozen uint32
}
|
example:使用 ADK 快速构建具有 ReAct 能力的 ChatModelAgent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| import github.com/cloudwego/eino/adk
// 创建一个包含多个工具的 ReAct ChatModelAgent
chatAgent := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: "intelligent_assistant",
Description: "An intelligent assistant capable of using multiple tools to solve complex problems",
Instruction: "You are a professional assistant who can use the provided tools to help users solve problems",
Model: openaiModel,
ToolsConfig: adk.ToolsConfig{
Tools: []tool.BaseTool{
searchTool,
calculatorTool,
weatherTool,
},
}
})
|
备注:在adk上线前,Flow/集成工具目录下提供了基于compose.Graph的 ReAct Agent 和 Host Multi Agent。(推荐使用新的基于adk的统一定义的版本)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| // eino-framework/eino/adk
adk/
├── 1. 核心接口定义
│ ├── interface.go # 定义Agent、Message等核心接口和数据结构
│ └── instruction.go # 指令相关的接口定义
│
├── 2. 基础工具和基础设施
│ ├── utils.go #** 异步迭代器、生成器**等核心工具函数
│ ├── call_option.go # 调用选项和配置管理
│ └── runctx.go # 运行时上下文管理
│
├── 3. 核心Agent实现
│ └── react.go # ReAct Agent,实现推理和行动循环
│ ├── chatmodel.go # ChatModel Agent,**基于上述react.go的ReactAgent,处理AI对话和工具调用**
│ ├── agent_tool.go # 代理工具集成,支持工具调用功能
│ ├── flow.go # 流程Agent,管理代理间的消息流转
│ ├── workflow.go # Workflow(工作流-精确流水线)Agent,支持 顺序、并发、循环 控制 子Agent 可预测的 确定性执行流程
│ └── Custom Agent # 通过接口实现自己的 Agent,允许定义高度定制的复杂 Agent
│
├── 4. 执行和运行管理
│ ├── runner.go # 代理运行器,管理代理的生命周期和执行
│ └── interrupt.go # 中断处理,支持代理执行的中断和恢复
│
├── 5. 预构建组件
│ └── prebuilt/ # 预构建的代理和工具组件
│ ├──--- supervisor.go # 监督者模式实现:监督者Agent控制所有通信流程和任务委托,并根据当前上下文和任务需求决定调用哪个Agent。
│ ├──--- plan_execute.go # 计划-执行-反思 模式:Plan Agent 生成含多个步骤的计划,Execute Agent 根据用户 query 和计划来完成任务。Execute 后会再次调用 Plan,决定完成任务 / 重新进行规划。
│
└── 6. 测试文件
├── *_test.go # 各模块的单元测试
└── ...
|
https://www.cloudwego.io/zh/docs/eino/core_modules/eino_adk/agent_implementation/chat_model/
2.2 WorkflowAgent: 精密的agent流水线
#
区别于基于 LLM自主决策 的Transfer(不确定的执行流),Workflow Agents模式 采用**预设决策(代码定义的执行流,可预测、可控制)**的方式来运行子Agent。
可基于 Sequential Agent(顺序)、Parallel Agent(并发)、Loop Agent(循环)三种基础的 Workflow Agent执行模式 进行组合嵌套,构建各种复杂的执行流程。
1. Sequential Agent(顺序)
#
- 线性执行:最基础的Workflow Agent,严格按照SubAgents数组的顺序,依次执行一次注册的Agents后结束。
- History 传递:每个 Agent 的执行结果都会被添加到 History 中,后续 Agent 可以访问前面 Agent 的执行历史,形成一个线性的执行链。
- 支持 提前退出:如果任何一个子 Agent 产生退出 / 中断动作,整个 Sequential 流程会立即终止。
可能的应用场景:
- 数据 ETL:
ExtractAgent(从 MySQL 抽取订单数据)→ TransformAgent(清洗空值、格式化日期)→ LoadAgent(加载到数据仓库) - CI / CD 流水线:
CodeCloneAgent(从代码仓库拉取代码)→UnitTestAgent(运行单元测试,用例失败时返回错误与分析报告)→CompileAgent(编译代码)→DeployAgent(部署到目标环境)

example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import github.com/cloudwego/eino/adk
// 依次执行 制定研究计划 -> 搜索资料 -> 撰写报告
sequential := adk.NewSequentialAgent(ctx, &adk.SequentialAgentConfig{
Name: "research_pipeline",
Description: "",
SubAgents: []adk.Agent{
planAgent, // 制定研究计划
searchAgent, // 搜索资料
writeAgent, // 撰写报告
},
})
func NewPlanAgent() adk.Agent {
a, err := adk.NewChatModelAgent(...) // 底层使用的ChatModelAgent
return a
}
|
2. Loop Agent(循环)
#
配置中注册的 Agents基于 SequentialAgent 实现,循环的顺序依次执行配置中注册的 Agents,直到达到最大迭代次数 或 某个子 Agent 产生 ExitAction。
- 循环执行:重复执行 SubAgents 序列,每次循环都是一个完整的 Sequential Agent 执行过程。
- History 累积:每次迭代的结果都会累积到 History 中,后续迭代可以访问所有历史信息。
- 条件退出:支持通过输出包含
ExitAction 的事件或达到最大迭代次数来终止循环,配置 MaxIterations=0 时表示无限循环。
特别适用于需要 迭代优化、反复处理或持续监控****的场景;可能的应用场景有:
- 数据同步:
CheckUpdateAgent(检查源库增量)→ IncrementalSyncAgent(同步增量数据)→ VerifySyncAgent(验证一致性) - 压力测试:
StartClientAgent(启动测试客户端)→ SendRequestsAgent(发送请求)→ CollectMetricsAgent(收集性能指标)

example:
1
2
3
4
5
6
7
8
9
10
11
12
| import github.com/cloudwego/eino/adk
// 循环执行 5 次,每次顺序为:分析当前状态 -> 提出改进方案 -> 验证改进效果
loop := adk.NewLoopAgent(ctx, &adk.LoopAgentConfig{
Name: "iterative_optimization",
SubAgents: []adk.Agent{
analyzeAgent, // 分析当前状态
improveAgent, // 提出改进方案
validateAgent, // 验证改进效果
},
MaxIterations: 5,
})
|
3. Parallel Agent(并发)
#
基于相同的输入上下文,并发执行配置中注册的 所有Agents,并等待全部完成后结束。
- 并发执行:所有子 Agent 同时启动,在独立的 goroutine 中并行执行。Parallel 内部默认包含异常处理机制:
- Panic 恢复:每个 goroutine 都有独立的 panic 恢复机制
- 错误隔离:单个子 Agent 的错误不会影响其他子 Agent 的执行
- 中断处理:支持子 Agent 的中断和恢复机制
- 共享输入:所有子 Agent 接收调用 Pararllel Agent 相同的初始输入和上下文。
- 等待与结果聚合:内部使用 sync.WaitGroup 等待所有子 Agent 执行完成,收集所有子 Agent 的执行结果并按接收顺序输出到
AsyncIterator 中。
特别适用于可以独立并行处理的任务,能够显著提高执行效率;
可能的应用场景:
- 多源数据采集:
MySQLCollector(采集用户表)+ PostgreSQLCollector(采集订单表)+ MongoDBCollector(采集商品评论) - 多渠道推送:
WeChatPushAgent(推送到微信公众号)+ SMSPushAgent(发送短信)+ AppPushAgent(推送到 APP)

example:
1
2
3
4
5
6
7
8
9
10
11
12
| import github.com/cloudwego/eino/adk
// 并发执行 情感分析 + 关键词提取 + 内容摘要
parallel := adk.NewParallelAgent(ctx, &adk.ParallelAgentConfig{
Name: "multi_analysis",
Description: "",
SubAgents: []adk.Agent{
sentimentAgent, // 情感分析
keywordAgent, // 关键词提取
summaryAgent, // 内容摘要
},
})
|
2.3 prebuilt的MultiAgent范式
#
1. Plan-Execute模式(结构化解决问题)
#
是 ADK 提供的基于「规划-执行-反思」范式的 Multi-Agent 协作模式(参考论文 Plan-and-Solve Prompting),旨在解决复杂任务的分步拆解、执行与动态调整问题。
通过** Planner(规划器)、Executor(执行器)和 Replanner(重规划器)** 三个核心Agent的协同工作,实现任务的结构化规划、工具调用执行、进度评估与动态replanning,最终达成用户目标。
其中:
- 规划者(Planner):根据用户目标,生成一个包含详细步骤且结构化的初始任务计划
- 执行者(Executor):执行当前计划中的首个步骤,调用外部工具完成具体任务。基于
ChatModelAgent 实现,配置工具集(如搜索、计算、数据库访问等)- 从 Session 中获取当前
Plan 和已执行步骤 - 提取计划中的第一个未执行步骤作为目标
- 调用工具执行该步骤,将结果存储于 Session
- 反思者(Replanner):评估执行进度,决定是修正计划继续交由 Executor 运行,或是结束任务
实现方式:组合SequentialAgent 和 LoopAgent
- 外层
SequentialAgent:先执行 Planner 生成初始计划,再进入执行-重规划循环 - 内层
LoopAgent:循环执行 Executor 和 Replanner,直至任务完成或达到最大迭代次数



example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import github.com/cloudwego/eino/adk/prebuilt/planexecute
// Plan-Execute 模式的科研助手
researchAssistant := planexecute.New(ctx, &planexecute.Config{
Planner: adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: "research_planner",
Instruction: "制定详细的研究计划,包括文献调研、数据收集、分析方法等",
Model: gpt4Model,
}),
Executor: adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: "research_executor",
ToolsConfig: adk.ToolsConfig{
Tools: []tool.BaseTool{
scholarSearchTool,
dataAnalysisTool,
citationTool,
},
},
}),
Replanner: replannerAgent,
})
|
Plan-Execute 模式有如下特点:
- 明确的分层架构:通过将任务拆解为规划、执行和反思重规划三个阶段,形成层次分明的认知流程,体现了** “先思考再行动,再根据反馈调整” 的闭环认知策略,在各类场景中都能达到较好的效果**。
- 动态迭代优化:Replanner 根据执行结果和当前进度,实时判断任务是否完成或需调整计划,支持动态重规划。该机制有效解决了传统单次规划难以应对环境变化和任务不确定性的瓶颈,提升了系统的鲁棒性和灵活性。
- 职责分明且松耦合:Plan-Execute 模式由多个智能体协同工作,支持独立开发、测试和替换。模块化设计方便扩展和维护,符合工程最佳实践。
- 具备良好扩展性:不依赖特定的语言模型、工具或 Agent,方便集成多样化外部资源,满足不同应用场景需求。
非常适合需要多步骤推理、动态调整和工具集成的复杂任务场景;
可能的应用场景有:
- 复杂研究分析:通过规划分解研究问题,执行多轮数据检索与计算,动态调整研究方向和假设,提升分析深度和准确性。
- 自动化工作流管理:将复杂业务流程拆解为结构化步骤,结合多种工具(如数据库查询、API 调用、计算引擎)逐步执行,并根据执行结果动态优化流程。
- 多步骤问题解决:适用于需要分步推理和多工具协作的场景,如法律咨询、技术诊断、策略制定等,确保每一步执行都有反馈和调整。
- 智能助理任务执行:支持智能助理根据用户目标规划任务步骤,调用外部工具完成具体操作,并根据重规划思考结合用户反馈调整后续计划,提升任务完成的完整性和准确性。
Eino中的Multi-Agent自定义架构要如何设计与实现?
使用Eino框架实现DeerFlow系统
2. Supervisor模式(中心化协调模式)
#
是 ADK 提供的一种中心化 Multi-Agent 协作模式,旨在为集中决策与分发执行的通用场景提供解决方案。由一个 Supervisor Agent(监督者) 和多个 SubAgent (子 Agent)组成,其中:
- Supervisor Agent:作为协作核心, 负责任务的分配(如基于规则或 LLM 决策)、子 Agent 完成后的结果汇总与下一步决策。
- **SubAgents:**专注于执行具体任务。子 Agent 完成后,
WithDeterministicTransferTo 触发 Transfer 事件,将任务转让回 Supervisor,确保在完成后自动将任务控制权交回 Supervisor。
Supervisor 模式有如下特点:
- 中心化控制:Supervisor 统一管理子 Agent,可根据输入与子 Agent 执行结果动态调整任务分配。
- 确定性回调:子 Agent 执行完毕后会将运行结果返回到 Supervisor Agent,避免协作流程中断。
- 松耦合扩展:子 Agent 可独立开发、测试和替换,方便拓展与维护。只需确保实现 Agent 接口并绑定到 Supervisor,即可接入协作流程。
非常适合于动态协调多个专业 Agent 完成复杂任务的场景;
可能的应用场景有:
- 科研项目管理:Supervisor 分配 调研、实验、报告撰写 任务给不同子 Agent。
- 客户服务流程:Supervisor 根据用户问题类型,分配给技术支持、售后、销售等子 Agent。


example:
1
2
3
4
5
6
7
8
9
10
11
12
| import github.com/cloudwego/eino/adk/prebuilt/supervisor
// 科研项目管理:创建一个监督者模式的 multi-agent
// 包含 research(调研),experimentation(实验),report(报告)三个子 Agent
supervisor, err := supervisor.New(ctx, &supervisor.Config{
SupervisorAgent: supervisorAgent,
SubAgents: []adk.Agent{
researchAgent,
experimentationAgent,
reportAgent,
},
})
|
完整example:
multiagent/integration-project-manager: supervisor agent(推荐)
WithDeterministicTransferTo
是 Eino ADK 提供的 Agent 增强工具,用于为 Agent 注入任务转让(Transfer)能力 。它允许开发者为目标 Agent 预设固定的任务转让路径,当该 Agent 完成任务(未被中断)时,会自动生成 Transfer 事件,将任务流转到预设的目标 Agent。
是构建 Supervisor Agent 协作模式的基础,确保子 Agent 在执行完毕后能可靠地将任务控制权交回监督者(Supervisor),形成“分配-执行-反馈”的闭环协作流程。
1
2
3
4
5
6
7
8
| // 包装方法
func AgentWithDeterministicTransferTo(_ context.Context, config *DeterministicTransferConfig) Agent
// 配置详情
type DeterministicTransferConfig struct {
Agent Agent // 被增强的目标 Agent
ToAgentNames []string // 任务完成后转让的目标 Agent 名称列表
}
|
2.4 Human-in-the-Loop
#
三、基础设计
#
统一的 Agent 抽象
#
ADK 的核心是一个简洁而强大的 Agent 接口,无论是简单的问答机器人,还是复杂的多步骤任务处理系统,都可以通过这个统一的接口加以实现。
1
2
3
4
5
6
7
| // github.com/cloudwego/eino/adk/interface.go
type Agent interface {
Name(ctx context.Context) string // **明确的身份**
Description(ctx context.Context) string // **清晰的职责**
Run(ctx context.Context, input *AgentInput) ***AsyncIterator[*AgentEvent]** // **标准化的执行方式**。返回一个**异步迭代器****(生产与消费之间没有同步控制)**。调用者可以通过这个 **AgenEvent Iterator****(**迭代器) 持续接收 Agent 产生的事件
// 执行任务时可通过 **Context 中的 Session 暂存数据**
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // **AgentInput:**
type AgentInput struct {
Messages []Message // 同ChatModel,用户指令、对话历史、背景知识、样例数据等任何你希望传递给 Agent 的数据
EnableStreaming boo. // 向 Agent的**建议其输出模式(并非一个强制性约束):控制那些同时支持流式和非流式输出的组件的行为(如ChatModel),不会影响仅支持一种输出方式的组件**
}
input := &adk.AgentInput{
Messages: []adk.Message{
schema.UserMessage("What's the capital of France?"),
schema.AssistantMessage("The capital of France is Paris.", nil),
schema.UserMessage("How far is it from London? "),
},
}
// **AgentRunOption**有一个 DesignateAgent 方法,调用该方法可以在调用多 Agent 系统时指定 Option 生效的 Agent。****func (m *MyAgent) Run(ctx context.Context, input *adk.AgentInput, opts ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
o := &options{}
o = adk.GetImplSpecificOptions(o, opts...) // adk.GetImplSpecificOptions
// run code...
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // AsyncIterator声明:
// github.com/cloudwego/eino/adk/utils.go
type AsyncIterator[T any] struct { // 泛型结构体,迭代任何类型的数据。
...
}
func (ai *AsyncIterator[T]) Next() (T, bool) { // 阻塞式,每次调用 Next() ,程序会暂停执行,直到Agent 产生了一个新的 AgentEvent 或 Agent 主动关闭了迭代器(通常是Agent运行结束, 第二个返回值返回false)
...
}
// AsyncIterator 可以由 NewAsyncIteratorPair 创建:
func NewAsyncIteratorPair[T any]() (*AsyncIterator[T], *AsyncGenerator[T]) // 返回的 AsyncGenerator 用来生产数据
// AsyncIterator使用:常在 for 循环中处理
iter := agent.Run(xxx) // get AsyncIterator from Agent.Run
for {
event, ok := iter.Next()
if !ok {
break
}
// handle event
}
|
Agent.Run 通常会** 在新的Goroutine中运行Agent,从而立刻返回AsyncIterator供调用者监听(异步任务****):产生新的AgentEvent**时写入到 Generator 中,供 Agent 调用方在 Iterator 中消费:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import "github.com/cloudwego/eino/adk"
func (m *Agent) Run(ctx context.Context, input *adk.AgentInput, opts ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
// handle input
iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()
go func() { // goroutine
defer func() {
// recover code
gen.Close()
}()
// agent run code
// gen.Send(event)
}()
return iter
}
// AgentWithOptions: 支持Eino ADK Agent 做一些通用配置
// github.com/cloudwego/eino/adk/flow.go
func AgentWithOptions(ctx context.Context, agent Agent, opts ...AgentOption) Agent
|
异步事件驱动架构AsyncIterator[*AgentEvent]
#
ADK 采用了异步事件流设计,通过 AsyncIterator[*AgentEvent] 实现非阻塞的事件处理(unbuffed chan),并通过 Runner 框架运行 Agent:
- 实时响应:
AgentEvent 包含 Agent 执行过程中特定节点输出(Agent 回复、工具处理结果等等),用户可以立即看到 Agent 的思考过程和中间结果。 - 追踪执行过程:
AgentEvent 额外携带状态修改动作与运行轨迹,便于开发调试和理解 Agent 行为。 - 自动流程控制:框架通过
Runner 自动处理中断、跳转、退出行为,无需用户额外干预。
Agent在其运行过程中产生的核心事件数据结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| // github.com/cloudwego/eino/adk/interface.go
type AgentEvent struct {
AgentName string
RunPath []string // 当前 Agent 的完整调用链路(入口Agent到当前产生事件的所有AgentName)
Output *AgentOutput
Action *AgentAction
Err error
}
type AgentOutput struct {
MessageOutput *MessageVariant // Message 输出
CustomizedOutput any // 其他类型的输出
}
type MessageVariant struct { // **核心结构**
IsStreaming bool // 标志位,true -> MessageStream, false -> Message
Message Message
MessageStream MessageStream
// message role: Assistant or Tool
Role schema.RoleType // 消息的角色(常用的元数据)
// only used when Role is Tool
ToolName string // 如果消息角色是 Tool ,这个字段会直接提供工具的名称(常用的元数据)
}
// 控制多 Agent 协作,比如立刻退出、中断、跳转等
type AgentAction struct {
Exit bool // true -> Multi-Agent 会立刻退出
Interrupted *InterruptInfo
TransferToAgent *TransferToAgentAction // 跳转到目标 Agent 运行
CustomizedAction any
}
|
Eino ADK 支持处于同一个系统内的 Agent 之间以多种方式进行协作(交换数据或触发运行):
- 共享 Session:单次运行过程中持续存在的 KV 存储,用于支持跨 Agent 的状态管理和数据共享。
1
2
3
4
5
6
7
8
| // 获取全部 SessionValues
func GetSessionValues(ctx context.Context) map[string]any
// 指定 key 获取 SessionValues 中的一个值,key 不存在时第二个返回值为 false,否则为 true
func GetSessionValue(ctx context.Context, key string) (any, bool)
// 添加 SessionValues。原SetSessionValue更名
func AddSessionValue(ctx context.Context, key string, value any)
// 批量添加 SessionValues
func AddSessionValues(ctx context.Context, kvs map[string]any)
|
- 移交运行(Transfer):携带本 Agent 输出结果上下文,将任务移交至子 Agent 继续处理。适用于智能体功能可以清晰的划分边界与层级的场景,常结合 ChatModelAgent 使用,通过 LLM 的生成结果进行动态路由。结构上,以此方式进行协作的两个 Agent 称为父子 Agent:

example:
1
2
3
4
5
| // 设置父子 Agent 关系
func SetSubAgents(ctx context.Context, agent Agent, subAgents []Agent) (Agent, error)
// 指定目标 Agent 名称,构造 Transfer Event
func NewTransferToAgentAction(destAgentName string) *AgentAction
|

- 每一个 Agent 产生的 AgentEvent 都会被保存到 History 中,在调用一个新 Agent 时(Workflow/ Transfer),History 中的 AgentEvent 会被转换并拼接到 AgentInput 中。
- 默认情况下,其他 Agent 的 Assistant Message 或 Tool Message,被转换为 User Message, 避免了当前Agent的chatModel的的上下文混乱。
- 只有当 Event 的 RunPath “属于”当前 Agent 的 RunPath 时( RunPathA 与 RunPathB 相同或者 RunPathA 是 RunPathB 的前缀),该 Event 才会参与构建 Agent 的 AgentInput。(过滤掉无关的AgentInput)
- 通过 WithHistoryRewriter 可以自定义 Agent 从 History 中生成 AgentInput 的方式:
1
2
3
| // github.com/cloudwego/eino/adk/flow.go
type HistoryRewriter func(ctx context.Context, entries []*HistoryEntry) ([]Message, error)
func WithHistoryRewriter(h HistoryRewriter) AgentOption
|
Transfer的含义是将任务移交运行给子Agent(SubAgents),而不是委托或者分配(区别于ToolCall)。Agent 运行时产生带有包含 TransferAction 的 AgentEvent 后,Eino ADK 会调用 Action 指定的 Agent。
运行前需要先调用 SetSubAgents 将可能的子 Agent 注册到 Eino ADK 中:
1
2
| // github.com/cloudwego/eino/adk/flow.go
func SetSubAgents(ctx context.Context, agent Agent, subAgents []Agent) (Agent, error)
|
运行时动态地注册父子 Agent:
如果 Agent 实现了 OnSubAgents 接口,SetSubAgents 中会调用相应的方法向 Agent 注册。
1
2
3
4
5
6
7
| // github.com/cloudwego/eino/adk/interface.go
type OnSubAgents interface {
OnSetSubAgents(ctx context.Context, subAgents []Agent) error
OnSetAsSubAgent(ctx context.Context, parent Agent) error
OnDisallowTransferToParent(ctx context.Context) error
}
|
TransferAction 可以使用 NewTransferToAgentAction 快速创建:
1
2
| import "github.com/cloudwego/eino/adk"
event := adk.NewTransferToAgentAction("dest agent name")
|
- 显式调用(ToolCall):将 Agent 视为工具进行调用。适用于 Agent 运行仅需要明确清晰的参数而非完整运行上下文的场景,常结合 ChatModelAgent,作为工具运行后将结果返回给 ChatModel 继续处理。除此之外,ToolCall 同样支持调用符合工具接口构造的、不含 Agent 的普通工具。

example:
1
2
| // 将 Agent 转换为 Tool
func NewAgentTool(_ context.Context, agent Agent, options ...AgentToolOption) tool.BaseTool
|
Runner抽象与 Interrupted Action、Checkpoint、Resume
#
Runner: Eino ADK 中负责执行 Agent 的核心引擎。
主要作用是**管理和控制 Agent 的整个生命周期****:**如处理多 Agent 协作,保存传递上下文等,interrupt、callback 等切面能力也均依赖 Runner 实现。
任何 Agent 都应通过 Runner 来运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // github.com/cloudwego/eino/adk/runners.go
// 声明
type Runner struct {
a Agent
enableStreaming bool
store compose.CheckPointStore
}
// 调用
runner := adk.NewRunner(ctx, adk.RunnerConfig{
EnableStreaming: true, // you can disable streaming here
Agent: a,
CheckPointStore: newInMemoryStore(),
})
|
1
2
3
4
5
6
| func (r *Runner) Run(ctx context.Context, messages []Message, opts ...AgentRunOption) *AsyncIterator[*AgentEvent]
// Query 是为了方便单次查询而提供的Run的语法糖
func (r *Runner) Query(ctx context.Context,query string, opts ...AgentRunOption) *AsyncIterator[*AgentEvent] {
return r.Run(ctx, []Message{schema.UserMessage(query)}, opts...)
}
|
Runner 提供运行时中断与恢复的功能:
允许正在运行中的 Agent 主动中断并保存其当前状态,并在未来从中断点恢复执行。
使用场景:长时间等待、可暂停或需要外部输入(Human in the loop)等。多轮对话( 多次的runner.Query() )?
- Interrupted Action:由 Agent 抛出
Interrupt Action 的 Event 中断事件,主动通知Agent Runner 中断运行(拦截)。并允许携带额外信息供调用方阅读与使用。 - Checkpoint:Agent
Runner 拦截事件后,通过初始化时注册的 CheckPointStore 保存当前运行状态。Runner 在终止运行后会将当前运行状态(原始输入、对话历史等)以及 Agent 抛出的 InterruptInfo 以 CheckPointID 为 key 持久化到 CheckPointStore 中。 - Resume:运行条件重新 ready 后,由 Agent
Runner 从断点通过 Resume 方法携带恢复运行所需要的新信息,从断点处恢复运行。
example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| // 1. 创建支持断点恢复的 Runner
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: complexAgent,
CheckPointStore: memoryStore, // 内存状态存储
})
// 2. 开始执行
iter := runner.Query(ctx, "recommend a book to me", adk.WithCheckPointID("1"))
for {
event, ok := iter.Next()
if !ok {
break
}
if event.Err != nil {
log.Fatal(event.Err)
}
if event.Action != nil {
// 3. 由 Agent 内部抛出 Interrupt 事件
if event.Action.Interrupted != nil {
ii, _ := json.MarshalIndent(event.Action.Interrupted.Data, "", "\t")
fmt.Printf("action: interrupted\n")
fmt.Printf("interrupt snapshot: %v", string(ii))
}
}
}
// 4. 从 stdin 接收用户输入
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("\nyour input here: ")
scanner.Scan()
fmt.Println()
nInput := scanner.Text()
// 5. 携带用户输入信息,从断点恢复执行
iter, err := runner.Resume(ctx, "1", adk.WithToolOptions([]tool.Option{subagents.WithNewInput(nInput)}))
|
序列化:
为了保存 interface 中数据的原本类型,Eino ADK 使用 gob(
https://pkg.go.dev/encoding/gob)序列化运行状态。
Eino 会**自动注册框架内置的类型,**在使用自定义类型时需要提前使用 gob.Register 或 gob.RegisterName 注册类型(更推荐后者,前者使用路径加类型名作为默认名字,因此类型的位置和名字均不能发生变更)。
inMemoryStore:
compose.CheckPointStore interface的一个实现。
1
2
3
4
5
| // **compose.CheckPointStore**
type CheckPointStore interface {
Get(ctx context.Context, checkPointID string) ([]byte, bool, error)
Set(ctx context.Context, checkPointID string, checkPoint []byte) error
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| type inMemoryStore struct {
mem map[string][]byte
}
func (i *inMemoryStore) Set(ctx context.Context, key string, value []byte) error {
i.mem[key] = value
return nil
}
func (i *inMemoryStore) Get(ctx context.Context, key string) ([]byte, bool, error) {
v, ok := i.mem[key]
return v, ok, nil
}
func newInMemoryStore() compose.CheckPointStore {
return &inMemoryStore{
mem: map[string][]byte{},
}
}
|
Resume:
调用 Runner 的 Resume 接口,传入中断时的 CheckPointID 可以恢复运行:
1
2
3
4
| iter, err := runner.Resume(ctx, "1", adk.WithToolOptions([]tool.Option{subagents.WithNewInput(nInput)}))
// github.com/cloudwego/eino/adk/runner.go
func (r *Runner) Resume(ctx context.Context, checkPointID string, **opts ...AgentRunOption**) (*AsyncIterator[*AgentEvent], error)
|
恢复 Agent 运行需要发生中断的 Agent 实现了 ResumableAgent 接口, Runner 从 CheckPointerStore 读取运行状态并恢复运行,
其中** InterruptInfo 和上次运行配置的 EnableStreaming 会作为输入提供给 Agent**:
Resume如果向 Agent 传入新信息,可以定义 AgentRunOption,并在调用 Runner.Resume 时传入。
1
2
3
4
5
6
7
8
9
10
11
12
| // github.com/cloudwego/eino/adk/interface.go
type ResumableAgent interface {
Agent
Resume(ctx context.Context, info *ResumeInfo, **opts ...AgentRunOption**) *AsyncIterator[*AgentEvent]
}
// github.com/cloudwego/eino/adk/interrupt.go
type ResumeInfo struct {
EnableStreaming bool
*InterruptInfo
}
|
四、adk example
#
4.1 helloworld
#
helloworld ChatModelAgent
#
7行代码:实现简单对话式ChatModelAgent
1
2
3
4
5
6
7
8
9
| model, err := ark.NewChatModel(...)
agent, err := adk.NewChatModelAgent(...)
runner := adk.NewRunner(...)
iter := runner.Query(ctx, "Hello, please introduce yourself. use chinese to answer")
for {
event, ok := iter.Next()
msg, err := event.Output.MessageOutput.GetMessage()
}
|
ChatModelAgent
#
1
2
3
4
5
6
7
| // 核心一行代码
runner := adk.NewRunner{
...,
CheckPointStore: newInMemoryStore(), // map[string][]byte
})
iter := runner.Query(ctx, "recommend a book to me", adk.WithCheckPointID("1"))
iter, err := runner.Resume(ctx, "1", adk.WithToolOptions([]tool.Option{subagents.WithNewInput(nInput)}))
|
12行代码:使用 ChatModelAgent 带interrupt中断和恢复、本地function tool。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| model, err := ark.NewChatModel(...)
bookSearchTool, err := utils.InferTool(..., func(ctx, input) (output, err) {...})
newAskForClarificationTool, err := utils.InferOptionableTool(...,func(..., opts) (output, err) {...}
agent, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{..., ToolsConfig: adk.ToolsConfig{...}}
runner := adk.NewRunner{
...,
CheckPointStore: newInMemoryStore(), // map[string][]byte
})
iter := runner.Query(ctx, "recommend a book to me", adk.WithCheckPointID("1"))
// 交互循环
for {
event, ok := iter.Next()
prints.Event(event)
}
scanner := bufio.NewScanner(os.Stdin)
scanner.Scan()
nInput := scanner.Text()
iter, err := runner.Resume(ctx, "1", adk.WithToolOptions([]tool.Option{subagents.WithNewInput(nInput)}))
for {
event, ok := iter.Next()
prints.Event(event)
}
|
custom Agent
#
7行代码:实现符合ADK定义的自定义Agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| type MyAgent struct {}
func (m *MyAgent) Name() {...}
func (m *MyAgent) Description() {...}
func (m *MyAgent) Run(...) *adk.AsyncIterator[*adk.AgentEvent] {
iter, gen := adk.NewAsyncIteratorPair[*adk.AgentEvent]()
go func() {
defer func() {
e := recover()
gen.Close()
}()
// agent run code
gen.Send(&adk.AgentEvent{
Output: &adk.AgentOutput{
MessageOutput: &adk.MessageVariant{
IsStreaming: false,
Message: &schema.Message{
Role: schema.Assistant,
Content: "hello world",
},
Role: schema.Assistant,
},
},
})
}()
}
|
4.2 workflow:Loop agent + Parallel agent + Sequential agent
#
1
2
3
4
5
6
7
| // 核心一行代码
a, err := adk.NewLoopAgent(ctx, &adk.LoopAgentConfig{... SubAgents:{[]adk.Agent{a1,a2}...}
a, err := adk.NewParallelAgent(ctx, &adk.LoopAgentConfig{... SubAgents:{[]adk.Agent{a1,a2,a3}...}
a, err := adk.NewSequentialAgent(ctx, &adk.LoopAgentConfig{... SubAgents:{[]adk.Agent{a1,a2}...}
ctx, endSpanFn := startSpanFn(ctx, "layered-supervisor", query)
endSpanFn(ctx, lastMessage)
|
Loop agent(循环agent):14行代码,loop agent:1个main agent + 1个critique****agent, + cozeloop trace
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // cozeloop trace: eino-ext/callbacks/cozeloop coze-dev/cozeloop-go
traceCloseFn, startSpanFn := trace.AppendCozeLoopCallbackIfConfigured(ctx)
defer traceCloseFn(ctx)
AppendCozeLoopCallbackIfConfigured()
cm, err := ark.NewChatModel()
a1, err := adk.NewChatModelAgent()
a2, err := adk.NewChatModelAgent()
a, err := adk.NewLoopAgent(ctx, &adk.LoopAgentConfig{... SubAgents:{[]adk.Agent{a1,a2}...}
query := "briefly introduce what a multimodal embedding model is."
ctx, endSpanFn := startSpanFn(ctx, "layered-supervisor", query)
runner := adk.NewRunner()
iter := runner.Query(ctx, query)
for {
event, ok := iter.Next()
prints.Event(event)
}
endSpanFn(ctx, lastMessage)
|
Parallel agent(并行agent):15行代码,Parallel agent:1个Stock数据收集 agent + 1个News数据收集agent + 1个社交媒体数据收集agent, + cozeloop trace
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // cozeloop trace: eino-ext/callbacks/cozeloop coze-dev/cozeloop-go
traceCloseFn, startSpanFn := trace.AppendCozeLoopCallbackIfConfigured(ctx)
defer traceCloseFn(ctx)
AppendCozeLoopCallbackIfConfigured()
cm, err := ark.NewChatModel()
a1, err := adk.NewChatModelAgent() // NewStockDataCollectionAgent
a2, err := adk.NewChatModelAgent() // NewNewsDataCollectionAgent
a3, err := adk.NewChatModelAgent() // NewSocialMediaInfoCollectionAgent
a, err := adk.NewParallelAgent(ctx, &adk.LoopAgentConfig{... SubAgents:{[]adk.Agent{a1,a2,a3}...}
query := "give me today's market research"
ctx, endSpanFn := startSpanFn(ctx, "layered-supervisor", query)
runner := adk.NewRunner()
iter := runner.Query(ctx, query)
for {
event, ok := iter.Next()
prints.Event(event)
}
endSpanFn(ctx, lastMessage)
|
Sequential (连续的)agent:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // cozeloop trace: eino-ext/callbacks/cozeloop coze-dev/cozeloop-go
traceCloseFn, startSpanFn := trace.AppendCozeLoopCallbackIfConfigured(ctx)
defer traceCloseFn(ctx)
AppendCozeLoopCallbackIfConfigured()
cm, err := ark.NewChatModel()
a1, err := adk.NewChatModelAgent() // NewPlanAgent
a2, err := adk.NewChatModelAgent() // NewWriterAgent
a, err := adk.NewSequentialAgent(ctx, &adk.LoopAgentConfig{... SubAgents:{[]adk.Agent{a1,a2}...}
query := "give me today's market research"
ctx, endSpanFn := startSpanFn(ctx, "layered-supervisor", query)
runner := adk.NewRunner()
iter := runner.Query(ctx, query)
for {
event, ok := iter.Next()
prints.Event(event)
}
endSpanFn(ctx, lastMessage)
|
4.3 跨agent传递
#
session:跨agent传递 data and state(状态)
#
1
2
3
| // 核心一行代码
adk.AddSessionValue(ctx, "user-name", in.Name) // a1
userName, _ := adk.GetSessionValue(ctx, "user-name") // a2
|
9行代码:AddSessionValue、GetSessionValue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| adk.AddSessionValue(ctx, "user-name", in.Name) // a1
userName, _ := adk.GetSessionValue(ctx, "user-name") // a2
toolA, err := utils.InferTool("tool_a", "set user name", toolAFn)
toolB, err := utils.InferTool("tool_b", "set user age", toolBFn)
a, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
...
ToolsConfig{toolA,toolB},
Model: model.NewChatModel()
}
r := adk.NewRunner(Agent: a)
iter := r.Query(ctx, "my name is Alice, my age is 18")
for {
event, ok := iter.Next()
prints.Event(event)
}
|
transfer移交运行
#
1
2
| // 核心一行代码
a, err := adk.SetSubAgents(ctx, routerAgent, []adk.Agent{chatAgent, weatherAgent})
|
12行代码:通过SetSubAgents的的transfer_to_agent 实现控制权的动态选择与转移。
Agent 职责单一 模块化,可独立开发测试,子 Agent 专注各自能力;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| weatherTool, err := utils.InferTool(...)
a1, err := adk.NewChatModelAgent(... Tools:weatherTool) // weatherAgent
a2, err := adk.NewChatModelAgent() // chatAgent
a3, err := adk.NewChatModelAgent() // routerAgent
a, err := adk.SetSubAgents(routerAgent, []adk.Agent{chatAgent, weatherAgent}) // SetSubAgents会在 RouterAgent 中注入 transfer_to_agent
runner := adk.NewRunner(a)
iter := runner.Query(ctx, "What's the weather in Beijing?") // transfer(转移)到 WeatherAgent
for {
event, ok := iter.Next()
prints.Event(event)
}
iter = runner.Query(ctx, "Book me a flight from New York to London tomorrow.") // 无匹配 Agent,RouterAgent 直接回复无法处理
for {
event, ok := iter.Next()
prints.Event(event)
}
|
4.4 plan-execute-replan范式
#
multiagent/plan-execute-replan
#
1
2
3
4
5
6
7
8
9
10
| // 核心一行代码
entryAgent, err := planexecute.New(ctx, &planexecute.Config{
Planner: planAgent,
Executor: executeAgent,
Replanner: replanAgent,
MaxIterations: 20,
})
ctx, endSpanFn := startSpanFn(ctx, "plan-execute-replan", query)
endSpanFn(ctx, lastMessage)
|
计划-执行-重新计划 agent:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| traceCloseFn, startSpanFn := trace.AppendCozeLoopCallbackIfConfigured(ctx)
defer traceCloseFn(ctx)
planexecute.NewPlanner(cm) // eino/adk/prebuilt/planexecute
planAgent, err := agent.NewPlanner(ctx)
planexecute.NewExecutor(cm)
executeAgent, err := agent.NewExecutor(ctx)
planexecute.NewReplanner(cm)
replanAgent, err := agent.NewReplanAgent(ctx)
entryAgent, err := planexecute.New(ctx, &planexecute.Config{
Planner: planAgent,
Executor: executeAgent,
Replanner: replanAgent,
MaxIterations: 20,
})
r := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: entryAgent,
})
query := `Plan a 3-day trip to Beijing in Next Month. I need flights from New York, hotel recommendations, and must-see attractions.
Today is 2025-09-09.`
ctx, endSpanFn := startSpanFn(ctx, "plan-execute-replan", query)
iter := r.Query(ctx, query)
for {
event, ok := iter.Next()
prints.Event(event)
}
endSpanFn(ctx, lastMessage)
|
multiagent/integration-excel-agent
#
文档:
https://mp.weixin.qq.com/s/787AJLf2czPn4L-FAnB9zA
跑起来:
- 根据readme.md,配置ARK_VISION_MODEL、ARK_VISION_API_KEY、EXCEL_AGENT_PYTHON_EXECUTABLE_PATH=python3
- 在.env同级目录安装venv虚拟环境,并安装好readme的依赖
- 手动加载env和source venv/bin/activate,最后以sudo身份读取env并启动。
核心代码-第一层:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| p, err := planner.NewPlanner(ctx, operator)
e, err := executor.NewExecutor(ctx, operator)
rp, err := replanner.NewReplanner(ctx, operator)
planExecuteAgent, err := planexecute.New(ctx, &planexecute.Config{
Planner: p,
Executor: e,
Replanner: rp,
MaxIterations: 20,
})
reportAgent, err := report.NewReportAgent(ctx, operator)
agent, err := adk.NewSequentialAgent(ctx, &adk.SequentialAgentConfig{
Name: "SequentialAgent",
Description: "sequential agent",
SubAgents: []adk.Agent{
planExecuteAgent, reportAgent,
},
})
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: agent,
EnableStreaming: true,
})
prints.Event(event)
// Operator → Tool → LLM
operator := &LocalOperator{} // commandline.Operator 接口的实现,提供底层文件与命令执行能力。**不直接作为tool,用于被多个Tool复用。**
|
核心代码-第二层-NewPlanner:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| plannerPromptTemplate = prompt.FromMessages(schema.Jinja2,
schema.SystemMessage(`你是一位专门处理 Excel 数据处理任务的专家规划器。你的目标是理解用户需求并将其分解为清晰的、分步骤的计划。
**1. 理解目标:**
**2. 交付物:**
**3. 计划分解原则:**
**4. 输出格式(Few-shot 示例):**
以下是一个好的计划示例:
用户请求:"请计算附件'sales_data.xlsx'文件中每个产品类别的平均销售额并生成报告。"
{
"steps": [
{
"instruction": "将'sales_data.xlsx'文件读取到 pandas DataFrame 中。"
},
{
"instruction": "按'Product Category'对 DataFrame 进行分组,并计算每个组的'Sales'列的平均值。"
},
{
"instruction": "总结每个产品类别的平均销售额,并以表格形式呈现结果。"
}
]
}
**5. 限制:**
- 不要在计划中直接生成代码
- 确保计划是逻辑合理且可实现的
- 最后一步应该始终是生成报告或提供最终结果
`),
schema.UserMessage(`
User Query: {{ user_query }}
当前时间:{{ current_time }}
文件预览(如果文件扩展名为 xlsx,预览将提供前 20 行的具体内容,否则仅提供文件路径):{{ file_preview }}
`),
)
)
cm := utils.NewChatModel(ctx,
utils.WithMaxTokens(4096), // 计划很简洁,同时也节省token
utils.WithTemperature(0), // 几乎确定性,相同输入产生相同输出
utils.WithTopP(0), // 只考虑最可能的 token
utils.WithDisableThinking(true), // 规划任务需要直接输出,不需要中间推理。节省token,提高效率。
utils.WithResponseFormatJsonSchema(&openai.ChatCompletionResponseFormatJSONSchema{
Name: generic.PlanToolInfo.Name,
Description: generic.PlanToolInfo.Desc,
JSONSchema: sc, // 上述json schema
Strict: true,
}),
)
a, err := planexecute.NewPlanner(ctx, &planexecute.PlannerConfig{
ChatModelWithFormattedOutput: cm, // 确保输出符合 JSON Schema
**GenInputFn**: func(ctx context.Context, userInput []adk.Message) ([]adk.Message, error) { // prompt,使用上述prompt模板
pf, _ := params.GetTypedContextParams[string](ctx, params.UserAllPreviewFilesSessionKey)
msgs, err := plannerPromptTemplate.Format(ctx, map[string]any{
"user_query": utils.FormatInput(userInput),
"current_time": utils.GetCurrentTime(),
"file_preview": pf,
})
return msgs, nil
}, // 将用户输入转换为模型输入
NewPlan: func(...) { return &generic.Plan{} }, // 创建空的计划对象
})
// Wrapper的逻辑在:agents/wrap_plan.go,放在agents/planner/下更合适?
return agents.NewWrite2PlanMDWrapper(a, op) // **custom adk.agent, 包装器(Wrapper)模式:在不修改原智能体a的情况下,增加“将计划写入 Markdown 文件”的功能。**
|
核心代码-第二层-common/Plan:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| type Step struct {
Index int `json:"index"`
Desc string `json:"desc"`
}
type Plan struct {
Steps []Step `json:"steps"`
}
func (p *Plan) FirstStep() string {
if len(p.Steps) == 0 {
return ""
}
stepStr, _ := sonic.MarshalString(p.Steps[0])
return stepStr
}
func (p *Plan) MarshalJSON() ([]byte, error) {
type Alias Plan // **使用类型别名避免循环依赖**
return json.Marshal((*Alias)(p))
}
func (p *Plan) UnmarshalJSON(bytes []byte) error {
type Alias Plan
a := (*Alias)(p)
return json.Unmarshal(bytes, a)
}
// 定义工具信息,约束 LLM 输出格式(包括参数类型校验等)
var PlanToolInfo = &schema.ToolInfo{
Name: "create_plan",
Desc: "生成一个结构化的、分步骤的执行计划来解决给定的复杂任务。计划中的每个步骤必须分配给专门的智能体,并且必须有清晰、可执行的描述。",
ParamsOneOf: schema.NewParamsOneOfByParams(
map[string]*schema.ParameterInfo{
"steps": {
Type: schema.Array,
Desc: "要遵循的不同步骤,应按排序顺序排列",
ElemInfo: &schema.ParameterInfo{
Type: schema.Object,
SubParams: map[string]*schema.ParameterInfo{
"index": {
Type: schema.Integer,
Desc: "该步骤在整个计划中的序号。**必须从 1 开始,并且每个后续步骤必须恰好递增 1。**",
Required: true,
},
"desc": {
Type: schema.String,
Desc: "该步骤要执行的具体任务的清晰、简洁和可执行的描述。它应该是分配给智能体的直接指令。",
Required: true,
},
},
},
Required: true,
},
},
),
}
sc := PlanToolInfo.ToJSONSchema() // 将 ParamsOneOf 转换为 JSON Schema,Eino v0.6开始统一使用json schema(行业标准)
|
1
2
3
4
5
6
7
8
9
10
11
12
| // **自定义Plan(一个可执行的计划)**。实现adk.planexecute.Plan interface。
type Plan interface {
// FirstStep returns the first step to be executed in the plan.
FirstStep() string // **返回计划中第一个要执行的步骤,用于按顺序执行步骤**
// Marshaler serializes the Plan into JSON.
// The resulting JSON can be used in prompt templates.
json.Marshaler // 将 Plan 序列化为 JSON,用于prompt templates,供 LLM 理解计划结构
// Unmarshaler deserializes JSON content into the Plan.
// This processes output from structured chat models or tool calls into the Plan structure.
json.Unmarshaler // 将 JSON 反序列化为 Plan,用于处理结构化聊天模型或工具调用的输出,转换为 Plan
}
|
核心代码-第二层executor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| var executorPrompt = prompt.FromMessages(schema.FString,
schema.SystemMessage(`你是一个认真细致的执行Agent。请仔细遵循给定的计划并彻底执行任务。
可用工具:
- CodeAgent:这是一个专门处理 Excel 文件的代码代理。它接收分步计划,通过生成 Python 代码(利用 pandas 进行数据分析/操作,matplotlib 进行绘图/可视化,openpyxl 进行 Excel 读写)来处理每个任务,并按顺序执行任务。当需要为 Excel 操作进行分步 Python 编码时,React Agent应该调用它,以确保精确、高效地完成任务。
注意事项:
- 不要转移给其他代理,仅使用工具。
`),
schema.UserMessage(`## 目标
{input}
## 给定以下计划:
{plan}
## 已完成步骤和结果
{executed_steps}
## 你的任务是执行第一步,即:
{step}`))
cm, err := utils.NewChatModel(ctx,
utils.WithMaxTokens(4096),
utils.WithTemperature(float32(0)),
utils.WithTopP(float32(0)),
)
ca, err := newCodeAgent(ctx, operator)
sa, err := newWebSearchAgent(ctx)
a, err := planexecute.NewExecutor(ctx, &planexecute.ExecutorConfig{
Model: cm,
ToolsConfig: adk.ToolsConfig{
ToolsNodeConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{
adk.NewAgentTool(ctx, ca), // CodeAgent as tool
adk.NewAgentTool(ctx, sa), // WebSearchAgent as tool
},
},
},
MaxIterations: 20,
GenInputFn: func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { // prompt,使用上述executorPrompt模板
planContent, err := in.Plan.MarshalJSON()
return executorPrompt.Format(ctx, map[string]any{
"input": utils.FormatInput(in.UserInput),
"plan": string(planContent),
"executed_steps": utils.FormatExecutedSteps(in.ExecutedSteps),
"step": in.Plan.FirstStep(),
})
},
})
|
核心代码-第二层replanner:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
| var (
replannerPromptTemplate = prompt.FromMessages(schema.Jinja2,
schema.SystemMessage(`你是一位专门从事 Excel 数据处理任务的专家规划师。你的目标是理解用户需求并将其分解为清晰的、分步骤的计划。
**1. 理解目标:**
- 仔细分析用户的请求,确定最终目标。
- 识别输入数据(Excel 文件)和期望的输出格式。
**2. 交付物:**
- 最终输出应该是一个表示计划的 JSON 对象,包含步骤列表。
- 每个步骤必须是对执行该步骤的智能体的清晰且简洁的指令。
**3. 计划分解原则:**
- **粒度:** 将任务分解为尽可能小的逻辑步骤。例如,不要使用"处理数据",而应使用"读取 Excel 文件"、"过滤掉缺失值的行"、"计算 'Sales' 列的平均值"等。
- **顺序:** 步骤应按正确的执行顺序排列。
- **清晰度:** 每个步骤应该明确无误,易于执行该步骤的智能体理解。
**4. 输出格式(少样本示例):**
以下是一个良好计划的示例:
用户请求:"请计算附件 'sales_data.xlsx' 文件中每个产品类别的平均销售额,并生成报告。"
{
"steps": [
{
"instruction": "将 'sales_data.xlsx' 文件读取到 pandas DataFrame 中。"
},
{
"instruction": "按 'Product Category' 对 DataFrame 进行分组,并计算每个组的 'Sales' 列的平均值。"
},
{
"instruction": "总结每个产品类别的平均销售额,并以表格形式呈现结果。"
}
]
}
**5. 限制:**
- 不要在计划中直接生成代码。
- 确保计划是逻辑合理且可实现的。
- 最后一步应该始终是生成报告或提供最终结果。
**6. 重新规划:** // 明确指出如何判断是否终止loop
- 如果当前计划已完成,调用 'submit_result' 工具。
- 如果计划需要修改或扩展,使用新计划调用 'create_plan' 工具。 create_plan不是一个eino tool,只用于传递结构化数据(计划),而不是执行实际逻辑
`),
schema.UserMessage(`
用户查询:{{ user_query }}
当前时间:{{ current_time }}
文件预览:
{{ file_preview }}
已执行步骤:{{ executed_steps }}
剩余步骤:{{ remaining_steps }}
`),
)
)
cm, err := utils.NewChatModel(ctx,
utils.WithMaxTokens(4096),
utils.WithTopP(0),
utils.WithTemperature(1.0), // 支持随机性(创造性调整)
utils.WithDisableThinking(true),
)
respondInfo, err := tools.NewToolSubmitResult(op).Info(context.Background())
a, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{
ChatModel: cm,
PlanTool: generic.PlanToolInfo, // **用于创建/修改计划的tool**
RespondTool: respondInfo, // **用于提交结果并结束任务的tool**
GenInputFn: func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) {
pf, _ := params.GetTypedContextParams[string](ctx, params.UserAllPreviewFilesSessionKey)
plan, ok := in.Plan.(*generic.Plan)
// remove the first step
plan.Steps = plan.Steps[1:]
planStr, err := sonic.MarshalString(plan)
userInput, err := sonic.MarshalString(in.UserInput)
return replannerPromptTemplate.Format(ctx, map[string]any{
"current_time": utils.GetCurrentTime(),
"file_preview": pf,
"user_query": userInput,
"remaining_steps": planStr,
"executed_steps": utils.FormatExecutedSteps(in.ExecutedSteps),
})
},
NewPlan: func(ctx context.Context) planexecute.Plan {
return &generic.Plan{}
},
})
return agents.NewWrite2PlanMDWrapper(a, op) // 同planner
|
Excel Agent:是一个“看得懂 Excel 的智能助手”,它先把问题拆解成步骤,再一步步执行并校验结果。它能理解用户问题与上传的文件内容,提出可行的解决方案,并选择合适的工具(系统命令、生成并运行 Python 代码、网络查询等等)完成任务。
- 更稳定的产出质量,通过“规划—执行—反思”闭环减少漏项与错误
- 更强的可扩展性,各 Agent 独立构建,低耦合利于迭代更新。
- 更少的人工操作,把复杂繁琐的 Excel 处理工作交给 Agent 自动完成。
架构图:
- 规划者(Planner):分析用户输入,拆解用户问题为可执行的计划
- 执行者(Executor):正确执行当前计划中的首个步骤
- CodeAgent:接收来自 Executor 的指令,调用多种工具(例如读写文件,运行 python 代码等)完成任务
- WebSearchAgent:接收来自 Executor 的指令,进行网络搜索
- 反思者(Replanner):根据 Executor 执行的结果和现有规划,决定继续执行、调整规划或完成执行
- ReportAgent:根据运行过程与结果,生成总结性质的报告

运行动线图:

4.5 deepAgent范式
#
multiagent/deepAgent范式
deepAgent论文地址(中国人大学生和小红书员工的论文):
https://arxiv.org/html/2510.21618?_immersive_translate_auto_translate=1

4.6 supervisor范式
#
multiagent/supervisor
#
1
2
3
4
5
| // 核心一行代码
sv := supervisor.New(Supervisor: sv,SubAgents: []adk.Agent{searchAgent, mathAgent}
ctx, endSpanFn := startSpanFn(ctx, "Supervisor", query)
endSpanFn(ctx, lastMessage)
|
supervisor agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| traceCloseFn, startSpanFn := trace.AppendCozeLoopCallbackIfConfigured(ctx)
defer traceCloseFn(ctx)
sv, err := adk.NewChatModelAgent()
searchAgent, err := buildSearchAgent(ctx)
mathAgent, err := buildMathAgent(ctx)
sv := supervisor.New(Supervisor: sv,SubAgents: []adk.Agent{searchAgent, mathAgent} // adk/prebuilt/supervisor
query := "find US and New York state GDP in 2024. what % of US GDP was New York state?"
runner := adk.NewRunner(sv)
ctx, endSpanFn := startSpanFn(ctx, "Supervisor", query)
iter := runner.Query(ctx, query)
for {
event, hasEvent := iter.Next()
prints.Event(event)
}
endSpanFn(ctx, lastMessage)
|
multiagent/layered-supervisor
#
1
2
3
4
5
6
7
8
9
10
| // 核心一行代码
sv, err := supervisor.New(ctx, &supervisor.Config{
Supervisor: sv,
SubAgents: []adk.Agent{searchAgent, mathAgent},
})
mathAgent := supervisor.New(ctx, &supervisor.Config{
Supervisor: mathA,
SubAgents: []adk.Agent{sa, ma, da},
})
|
1个supervisor agent下有嵌套1个supervisor subagent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| sv, err := supervisor.New(ctx, &supervisor.Config{
Supervisor: sv,
SubAgents: []adk.Agent{searchAgent, mathAgent},
})
mathAgent := supervisor.New(ctx, &supervisor.Config{
Supervisor: mathA,
SubAgents: []adk.Agent{sa, ma, da},
})
query := "find US and New York state GDP in 2024. what % of US GDP was New York state? " +
"Then multiply that percentage by 1.589."
ctx, endSpanFn := startSpanFn(ctx, "layered-supervisor", query)
iter := adk.NewRunner(ctx, adk.RunnerConfig{
EnableStreaming: true,
Agent: sv,
}).Query(ctx, query)
var lastMessage adk.Message
for {
event, hasEvent := iter.Next()
if !hasEvent {
break
}
prints.Event(event)
if event.Output != nil {
lastMessage, _, err = adk.GetMessage(event)
}
}
endSpanFn(ctx, lastMessage)
// wait for all span to be ended
time.Sleep(5 * time.Second)
|
multiagent/integration-project-manager: supervisor agent(推荐)
#

详情:
https://mp.weixin.qq.com/s/p_QqDN6m2anHAE97P2Q2bw?forceh5=1
ProjectManagerAgent:项目开发经理Agent(使用 Supervisor 模式):根据动态的用户输入,路由并协调多个负责不同维度工作的子智能体开展工作。
ResearchAgent(调研Agent): 负责调研并生成可行方案。支持中断后从用户处接收额外的上下文信息来提高调研方案生成的准确性。CodeAgent(编码 Agent):使用知识库工具,召回相关知识作为参考,生成高质量的代码。ReviewAgent(评论 Agent):使用顺序工作流编排问题分析、评价生成、评价验证三个步骤,对调研结果/编码结果进行评审,给出合理的评价,供项目经理进行决策。- questionAnalysisAgent
- generateReviewAgent
- reviewValidationAgent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| // 核心一行代码
supervisorAgent, err := supervisor.New(ctx, &supervisor.Config{
Supervisor: s,
SubAgents: []adk.Agent{researchAgent, codeAgent, reviewAgent},
})
researchAgent := 带webSearchTool、newAskForClarificationTool
codeAgent := 带knowledgeBaseTool(召回RAG知识库)
reviewAgent := adk.NewSequentialAgent(
SubAgents: []adk.Agent{questionAnalysisAgent, generateReviewAgent, reviewValidationAgent},
})
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: supervisorAgent,
EnableStreaming: true,
CheckPointStore: newInMemoryStore(),
})
// 循环中断和恢复
for !finished {
var iter *adk.AsyncIterator[*adk.AgentEvent]
if !interrupted {
iter = runner.Query(ctx, query, adk.WithCheckPointID(checkpointID))
} else {
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("\ninput additional context for web search: ")
scanner.Scan()
fmt.Println()
nInput := scanner.Text()
iter, err = runner.Resume(ctx, checkpointID, adk.WithToolOptions([]tool.Option{agents.WithNewInput(nInput)}))
if err != nil {
log.Fatal(err)
}
}
interrupted = false
for {
event, ok := iter.Next()
if !ok {
if !interrupted {
finished = true
}
break
}
if event.Err != nil {
log.Fatal(event.Err)
}
if event.Action != nil {
if event.Action.Interrupted != nil {
interrupted = true
}
if event.Action.Exit {
finished = true
}
}
prints.Event(event)
}
}
|
核心代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
| // Init chat model for agents
tcm, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{..})
// Init research agent
researchAgent, err := agents.NewResearchAgent(ctx, tcm)
// Init code agent
codeAgent, err := agents.NewCodeAgent(ctx, tcm)
// Init technical agent
reviewAgent, err := agents.NewReviewAgent(ctx, tcm)
// Init project manager agent
s, err := agents.NewProjectManagerAgent(ctx, tcm)
// Combine agents into ADK supervisor pattern
// Supervisor: project manager
// Sub-agents: researcher / coder / reviewer
supervisorAgent, err := supervisor.New(ctx, &supervisor.Config{
Supervisor: s,
SubAgents: []adk.Agent{researchAgent, codeAgent, reviewAgent},
})
// Init Agent runner
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: supervisorAgent,
EnableStreaming: true,// enable stream output
CheckPointStore: newInMemoryStore(),// enable checkpoint for interrupt & resume
})
query := "please generate a simple ai chat project with python."
checkpointID := "1"
// Start runner with a new checkpoint id
iter := runner.Query(ctx, query, adk.WithCheckPointID(checkpointID))
interrupted := false
for {
event, ok := iter.Next()
if !ok {
break
}
if event.Err != nil {
log.Fatal(event.Err)
}
if event.Action != nil && event.Action.Interrupted != nil {
interrupted = true
}
prints.Event(event)
}
if !interrupted {
return
}
// interrupt and ask for additional user context
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("\ninput additional context for web search: ")
scanner.Scan()
fmt.Println()
nInput := scanner.Text()
// Resume by checkpoint id, with additional user context injection
iter, err = runner.Resume(ctx, checkpointID, adk.WithToolOptions([]tool.Option{agents.WithNewInput(nInput)}))
if err != nil {
log.Fatal(err)
}
for {
event, ok := iter.Next()
if !ok {
break
}
if event.Err != nil {
log.Fatal(event.Err)
}
prints.Event(event)
}
}
|
如果不用Eion,从0开发:
| 设计点 | 基于 Eino ADK 开发 | 传统开发模式 |
|---|
| Agent 抽象 | 统一定义,职责独立,代码整洁,便于各 Agent 分头开发 | 没有统一定义,团队协作开发效率差,后期维护成本高 |
| 输入输出 | 有统一定义,全部基于事件驱动运行过程通过 iterator 透出,所见即所得 | 没有统一定义,输入输出混乱运行过程只能手动加日志,不利于调试 |
| Agent 协作 | 框架自动传递上下文 | 通过代码手动传递上下文 |
| 中断恢复能力 | 仅需在 Runner 中注册 CheckPointStore 提供断点数据存储介质 | 需要从零开始实现,解决序列化与反序列化、状态存储与恢复等问题 |
| Agent 模式 | 多种成熟模式开箱即用 | 需要从零开始实现 |
4.7 加载.env的方法
#
方案1:使用.env文件配置环境变量
#
- vscode 安装
mikestead.dotenv 扩展:支持.env .env.local .env.example等常见文件名的语法高亮。但不支持自动加载环境变量。 - 项目根目录创建.env文件,务必将.env添加到.gitignore(否则ak/sk泄露到gitlab/github)。在.env中配置:
- 注意:不带双引号,不带export开头。
- 终端及其子进程要生效.env:
export $(grep -v '^#' .env | xargs) 或 .env每行都要加export开头再source- 直接
source .env ;测试当前终端能生效:echo $ARK_API_KEY ,但终端运行子进程时仍然读取不到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| # ark model: https://console.volcengine.com/ark
# 必填
# 火山云方舟 ChatModel 的 Endpoint ID
ARK_CHAT_MODEL=""
# 火山云方舟 向量化模型的 Endpoint ID
ARK_EMBEDDING_MODEL=""
# 火山云方舟的 API Key
ARK_API_KEY=""
ARK_BASE_URL="https://ark.cn-beijing.volces.com/api/v3/"
# apmplus: https://console.volcengine.com/apmplus-server
# 下面必填环境变量如果为空,则不开启 apmplus callback
# APMPlus 的 App Key,必填
APMPLUS_APP_KEY=""
# APMPlus 的 Region,选填,不填写时,默认是 cn-beijing
APMPLUS_REGION=""
# langfuse: https://cloud.langfuse.com/
# 下面两个环境变量如果为空,则不开启 langfuse callback
# Langfuse Project 的 Public Key
LANGFUSE_PUBLIC_KEY=""
# Langfuse Project 的 Secret Key。 注意,Secret Key 仅可在被创建时查看一次
LANGFUSE_SECRET_KEY=""
# Redis Server 的地址,不填写时,默认是 localhost:6379
REDIS_ADDR=
OPENAI_API_KEY=""
OPENAI_MODEL="gpt-4o-mini"
OPENAI_BASE_URL="https://api.openai.com/v1"
OPENAI_BY_AZURE=false
|
- 从调试设置里创建 .vscode/launch.json,设置加载.env。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| {
"version": "0.2.0",
"configurations": [
{
// 配置名称,显示在下拉菜单中
"name": "Debug helloworld",
// 调试器类型
"type": "go",
// 请求类型:launch(启动程序)或 attach(附加到运行中进程)
"request": "launch",
// Go 特定的模式:debug, test, exec 等
"mode": "auto",
// 要调试的程序路径
"program": "${workspaceFolder}/adk/helloworld",
// 从 .env 文件加载环境变量
"envFile": "${workspaceFolder}/.env",
// 控制台类型
"console": "integratedTerminal",
// 是否显示详细日志
"showLog": false
}
]
}
|
- command + shift + D启动调试,默认会加载.env。F5继续调试、F11单步进入;
go test加载.env:
.vscode/settings.json
1
2
3
| {
“go.testEnvFile”:"${workspaceFolder}/.env"
}
|
然后直接vscode中的 run test
上述为调试/运行场景,如果需要sudo运行:
Terminal-Run Task:使用VSCode的Tasks + 环境配置
.vscode/tasks.json:在 VSCode 内用 Terminal-Run Task即可执行。等价于在终端里执行其中的command。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| {
"version": "2.0.0",
"tasks": [
{
"label": "Run hello.go with sudo",
"type": "shell",
"command": "export $(grep -v '^#' .env | xargs) && sudo -E go run hello.go"
"options": {
"env": {
"GO_ENV": "development"
}
},
"problemMatcher": []
}
]
}
|
方法2:direnv工具 + sudo -E
#
- 安装
direnv1
2
3
| sudo apt install direnv # Ubuntu/Debian
# 或
brew install direnv # macOS
|
- 在项目根目录创建
.envrc1
| export $(grep -v '^#' .env | xargs)
|
- 启用
direnv: direnv allow。每次进入项目目录时,.env 中的变量都会被自动加载到当前 shell。 - 在 VSCode 终端中运行:
**-E**** 表示保留当前环境变量(包括****.env**** 中的变量)。(否则,切换到sudo后前面source .env的环境变量丢失了)**
方法3:godotenv package
#
1
2
3
4
5
6
7
8
9
| import "github.com/joho/godotenv"
err := godotenv.Load()
if err != nil {
log.Println("No .env file found or failed to load")
}
// 测试读取变量
apiKey := os.Getenv("API_KEY")
fmt.Println("API_KEY:", apiKey)
|
附:老版本 Eino React Agent(基于compose.Graph)
#
王德政:
eino flow/下的react agent 和adk/ 下的 react agent基本没区别,都是 react 模式;
adk 下的 ChatModelAgent 是符合 adk.Agent 接口的,接口更易用一些;
如果要使用 adk 相关能力建议用 adk 目录下的, 如果单独使用也建议用 adk 目录下的;
备注:不推荐。推荐使用后面新上线的adk/chatModelAgent,有更规范的agent定义的interface,其封装了adk/react.go的ReAct能力。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // 通过 flow/agent/react 包提供完整实现
// eino/flow/agent/react/react.go
type Agent struct {
runnable compose.Runnable[[]*schema.Message, *schema.Message]
graph *compose.Graph[[]*schema.Message, *schema.Message]
graphAddNodeOpts []compose.GraphAddNodeOpt
}
type AgentConfig struct {
ToolCallingModel model.ToolCallingChatModel // 支持工具调用的聊天模型
ToolsConfig compose.ToolsNodeConfig // 工具配置
MessageModifier MessageModifier // 消息修改器
MaxStep int // 最大步数限制
ToolReturnDirectly map[string]struct{} // 直接返回的工具
StreamToolCallChecker func(...) // 流式工具调用检查器
}
|
官网:
https://react-lm.github.io/
ReAct(Reasoning + Acting)是一种 AI Agent模式:用户输入 → 模型推理 → 工具调用 → 结果反馈 → 模型推理 → … → 最终答案。
- **推理阶段(Reasoning):**AI 模型分析用户问题、决定是否需要调用工具、选择合适的工具和参数
- **行动阶段(Acting):**执行选定的工具、获取工具执行结果、将结果作为上下文传递给下一轮推理当ChatModelAgent没有配置工具时,退化为一次 ChatModel 调用。

Eino React Agent 是实现了
React 模式 的Agent框架,用户可以用来快速灵活地构建并调用 React Agent。
React Agent 底层使用 compose.Graph 作为编排方案,主要包含两个节点:ChatModel、Tools
节点拓扑&数据流图:

用户输入 → ChatModel → Tools → ChatModel → … → 最终结果
- 所有历史消息都会放入 state 中
- 在传递给 ChatModel 前,会通过 MessageModifier 处理消息
- 直到 ChatModel 返回的消息中不再有 tool call,则返回最终消息
使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
| // 创建 ReAct 代理
agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: chatModel,
ToolsConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{restaurantTool, dishTool},
},
MaxStep: 12,
})
// 使用代理
msg, err := agent.Generate(ctx, []*schema.Message{
{Role: schema.User, Content: "推荐北京最好的川菜餐厅"},
})
|
带中断的 ReAct:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // 支持用户干预的 ReAct 实现
for {
result, err := runner.Invoke(ctx, input)
if err == nil {
fmt.Printf("最终结果: %s", result.Content)
break
}
// 处理中断,允许用户修改工具调用参数
info, ok := compose.ExtractInterruptInfo(err)
if ok {
// 用户确认或修改工具调用参数
// 继续执行
}
}
|
初始化配置
基础配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import (
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/flow/agent/react"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/schema"
)
// 创建 OpenAI 模型
openaiModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
APIKey: os.Getenv("OPENAI_API_KEY"),
Model: "gpt-3.5-turbo",
})
// 创建工具
weatherTool := tool.NewTool("get_weather", "获取天气信息", func(ctx context.Context, input string) (string, error) {
return "晴天,温度25°C", nil
})
// 创建 React Agent
agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: openaiModel,
ToolsConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{weatherTool},
},
MaxStep: 12, // 最大步数限制
})
|
高级配置选项:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: openaiModel,
ToolsConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{weatherTool, searchTool},
},
MessageModifier: func(ctx context.Context, messages []*schema.Message) []*schema.Message {
// 自定义消息处理逻辑
return messages
},
MaxStep: 12,
ToolReturnDirectly: map[string]struct{}{
"final_answer": {}, // 某些工具调用后直接返回
},
StreamToolCallChecker: customToolCallChecker, // 自定义流式工具调用检查器
})
|
调用方式:
1. Generate(同步调用)
1
2
3
4
5
6
7
8
9
| outMessage, err := agent.Generate(ctx, []*schema.Message{
schema.UserMessage("北京今天天气怎么样?"),
})
if err != nil {
log.Fatal(err)
}
fmt.Println("回答:", outMessage.Content)
|
- Stream(流式调用)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| msgReader, err := agent.Stream(ctx, []*schema.Message{
schema.UserMessage("写一个 golang 的 hello world 程序"),
})
if err != nil {
log.Fatal(err)
}
for {
msg, err := msgReader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Printf("接收错误: %v\n", err)
return
}
fmt.Print(msg.Content)
}
|
3. 使用回调(Callbacks)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| // 构建回调处理器
callback := react.BuildAgentCallback(
&template.ModelCallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
fmt.Printf("模型开始处理: %s\n", info.Name)
return ctx
},
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
fmt.Printf("模型处理完成: %s\n", info.Name)
return ctx
},
},
&template.ToolCallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
fmt.Printf("工具开始执行: %s\n", info.Name)
return ctx
},
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
fmt.Printf("工具执行完成: %s\n", info.Name)
return ctx
},
},
)
// 使用回调调用
outMessage, err := agent.Generate(ctx, []*schema.Message{
schema.UserMessage("查询天气"),
}, react.WithComposeOptions(compose.WithCallbacks(callback)))
|
在 Graph/Chain 中使用
Agent 可以作为 Lambda 嵌入到其他 Graph 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| agent, _ := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: chatModel,
ToolsConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{weatherTool, searchTool},
},
MaxStep: 40,
})
// 创建 Chain
chain := compose.NewChain[[]*schema.Message, string]()
agentLambda, _ := compose.AnyLambda(agent.Generate, agent.Stream, nil, nil)
chain.
AppendLambda(agentLambda).
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (string, error) {
fmt.Println("获得 Agent 响应:", input.Content)
return input.Content, nil
}))
r, _ := chain.Compile(ctx)
res, _ := r.Invoke(ctx, []*schema.Message{{Role: schema.User, Content: "hello"}})
|
运行过程分析
当用户输入:“我在海淀区,给我推荐一些菜,需要有口味辣一点的菜,至少推荐有 2 家餐厅”
第一步:ChatModel 推理
模型分析用户需求
决定调用 query_restaurants 工具
参数:{“location”:“海淀区”,“topn”:2}
第二步:Tools 执行
执行餐厅查询工具
返回 2 家海淀区餐厅信息
第三步:ChatModel 再次推理
基于餐厅信息,决定查询菜品
为每个餐厅调用 query_dishes 工具
并发执行多个工具调用
第四步:Tools 并发执行
同时查询两家餐厅的菜品
返回详细的菜品信息
第五步:ChatModel 最终整合
整合所有信息
生成最终推荐结果
五、a2a
#
Agent2Agent协议是Google提出的一种开放标准,旨在实现AI Agent之间的无缝通信与协作。
Eino ADK提供了双向封装能力:




A2A (Agent-to-Agent) 是一个标准化的 Agent 间通信协议实现,允许不同的 AI Agent 通过统一的接口进行交互和协作。
该模块使eino-ext拓展组件实现。
核心特性:
- 🔄 支持同步和异步消息交互
- 📡 支持流式响应(Streaming)
- 🔔 支持 Push Notification(异步通知)
- 🔐 支持多种认证方式
- 🎯 任务状态管理和生命周期控制
- 🔌 可插拔的传输层(目前支持 JSON-RPC)
- 🧩 与 Eino ADK (Agent Development Kit) 无缝集成
使用场景:
- 多 Agent 协作系统:不同 Agent 之间需要标准化通信
- Agent 服务化:将 Agent 能力封装为可远程调用的服务
- Agent 编排:构建复杂的 Agent 工作流
- 跨组织 Agent 调用:通过标准协议实现不同组织开发的 Agent 互通
模块层级结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| a2a/
├── models/ ** # 数据模型定义**
│ ├── task.go # Task 相关数据结构
│ ├── message.go # Message 相关数据结构
│ ├── artifact.go # Artifact 数据结构
│ ├── card.go # Agent Card 定义
│ ├── part.go # Message Part 定义
│ ├── handler.go # Handler 接口定义
│ └── ...
├── client/ **# A2A 客户端实现**
│ └── client.go
├── server/ **# A2A 服务端实现**
│ ├── server.go
│ ├── eventqueue.go # 事件队列
│ ├── taskstore.go # 任务存储
│ ├── tasklocker.go # 任务锁
│ └── notifier.go # 推送通知
├── transport/ **# 传输层抽象和实现**
│ ├── transport.go # 传输层接口
│ └── jsonrpc/ # JSON-RPC 实现
│ ├── client/
│ ├── server/
│ └── core/
├── extension/ ** # 扩展集成**
│ └── eino/ # Eino ADK 集成
│ ├── server.go # Eino Server 适配器
│ ├── client.go # Eino Client 适配器
│ └── utils.go
├── utils/ **# 工具函数**
└── examples/ **# 示例代码**
├── client/
└── server/
|
分层架构图:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| ┌─────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Eino Agent │ │ Custom App │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
└───────────┼──────────────────────────┼─────────────────┘
│ │
┌───────────┼──────────────────────────┼─────────────────┐
│ │ A2A Core Layer │ │
│ ┌────────▼─────────┐ ┌────────▼─────────┐ │
│ │ A2A Server │ │ A2A Client │ │
│ │ (server.go) │ │ (client.go) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ┌────────▼───────────────────────────▼─────────┐ │
│ │ Models & Data Structures │ │
│ │ Task, Message, Artifact, AgentCard, etc. │ │
│ └────────┬──────────────────────────────────────┘ │
└───────────┼─────────────────────────────────────────────┘
│
┌───────────┼─────────────────────────────────────────────┐
│ │ Transport Layer │
│ ┌────────▼─────────────────────────────────────┐ │
│ │ JSON-RPC over HTTP/HTTPS │ │
│ │ (transport/jsonrpc/) │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
|
Task(任务)
#
是 A2A 协议中的核心概念,代表一次完整的 Agent 交互过程。
Task 的生命周期状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
| type TaskState string
const (
TaskStateSubmitted TaskState = "submitted" // 已提交,等待处理
TaskStateWorking TaskState = "working" // 正在处理
TaskStateInputRequired TaskState = "input-required" // 需要用户输入(暂停)
TaskStateCompleted TaskState = "completed" // 已完成(终态)
TaskStateCanceled TaskState = "canceled" // 已取消(终态)
TaskStateFailed TaskState = "failed" // 失败(终态)
TaskStateRejected TaskState = "rejected" // 被拒绝(终态)
TaskStateAuthRequired TaskState = "auth-required" // 需要认证(暂停)
TaskStateUnknown TaskState = "unknown" // 未知状态
)
|
Task 状态转换图:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| ┌──────────────┐
│ submitted │
└──────┬───────┘
│
┌──────▼───────┐
┌───────┤ working ├───────┐
│ └──────┬───────┘ │
│ │ │
┌───────▼─────┐ ┌──────▼──────┐ ┌─────▼────────┐
│input-required│ │ completed │ │auth-required │
│ (paused) │ │ (terminal) │ │ (paused) │
└──────────────┘ └──────────────┘ └──────────────┘
│
┌───────▼─────┐ ┌───────────┐ ┌──────────┐
│ canceled │ │ failed │ │ rejected │
│ (terminal) │ │(terminal) │ │(terminal)│
└─────────────┘ └───────────┘ └──────────┘
|
Task 数据结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| type Task struct {
ID string // 唯一任务 ID(UUID)
ContextID string // 上下文 ID,用于关联多个任务
Status TaskStatus // 当前状态
Artifacts []*Artifact // 生成的工件(输出)
History []*Message // 历史消息记录
Metadata map[string]any // 元数据
}
type TaskStatus struct {
State TaskState // 状态
Message *Message // 关联消息
Timestamp string // 时间戳(ISO 8601)
}
|
Message(消息)
#
Message 表示用户或 Agent 之间交换的信息。
1
2
3
4
5
6
7
8
9
| type Message struct {
Role Role // "user" 或 "agent"
Parts []Part // 消息内容(可多模态)
Metadata map[string]any // 元数据
ReferenceTaskIDs []string // 引用的任务 ID
MessageID string // 消息 ID
TaskID *string // 所属任务 ID
ContextID *string // 上下文 ID
}
|
Part(消息片段)支持的类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| type PartKind string
const (
PartKindText PartKind = "text" // 文本
PartKindFile PartKind = "file" // 文件
PartKindData PartKind = "data" // 结构化数据
)
type Part struct {
Kind PartKind
Text *string // 文本内容
File *FileContent // 文件内容(Base64 或 URI)
Data map[string]any // 结构化数据
Metadata map[string]any
}
|
Artifact(工件)
#
Artifact 表示 Agent 生成的输出或中间结果。
1
2
3
4
5
6
7
| type Artifact struct {
ArtifactID string // 唯一标识
Name string // 名称
Description string // 描述
Parts []Part // 内容(可多模态)
Metadata map[string]any // 元数据
}
|
使用场景:
Agent Card(Agent 名片)
#
Agent Card 描述了一个 Agent 的基本信息和能力。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| type AgentCard struct {
ProtocolVersion string // A2A 协议版本("0.2.5")
Name string // Agent 名称
Description string // 描述
URL string // 服务地址
Version string // Agent 版本
Capabilities AgentCapabilities // 能力声明
Skills []AgentSkill // 技能列表
SecuritySchemes map[string]*SecurityScheme
DefaultInputModes []string // 支持的输入模式
DefaultOutputModes []string // 支持的输出模式
}
type AgentCapabilities struct {
Streaming bool // 是否支持流式
PushNotifications bool // 是否支持推送通知
StateTransitionHistory bool // 是否记录状态转换历史
}
|
Server 架构端实现
#
Server 架构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| type A2AServer struct {
agentCard *models.AgentCard
messageHandler MessageHandler // 普通消息处理器
messageStreamingHandler MessageStreamingHandler // 流式消息处理器
cancelTaskHandler CancelTaskHandler // 取消任务处理器
taskEventsConsolidator TaskEventsConsolidator // 事件合并器
logger Logger
taskIDGenerator func(ctx context.Context) (string, error)
contextIDGenerator func(ctx context.Context) (string, error)
taskStore TaskStore // 任务存储
taskLocker TaskLocker // 任务锁
queue EventQueue // 事件队列
pushNotifier PushNotifier // 推送通知器
}
|
核心 Handler 接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| // 普通消息处理器(同步)
type MessageHandler func(
ctx context.Context,
params *InputParams,
) (*models.TaskContent, error)
// 流式消息处理器(异步)
type MessageStreamingHandler func(
ctx context.Context,
params *InputParams,
writer ResponseEventWriter,
) error
// 取消任务处理器
type CancelTaskHandler func(
ctx context.Context,
params *InputParams,
) (*models.TaskContent, error)
// 事件合并器:将流式事件合并为最终任务状态
type TaskEventsConsolidator func(
ctx context.Context,
t *models.Task,
events []models.ResponseEvent,
handleErr error,
) *models.TaskContent
|
消息处理流程:
同步消息处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| Client Request
│
▼
┌─────────────────┐
│ SendMessage │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Lock Task │──────┐ (new task or existing)
└────────┬────────┘ │
│ │
▼ │
┌─────────────────┐ │
│ MessageHandler │ │
└────────┬────────┘ │
│ │
▼ │
┌─────────────────┐ │
│ Update TaskStore│ │
└────────┬────────┘ │
│ │
▼ │
┌─────────────────┐ │
│ Unlock Task │◄─────┘
└────────┬────────┘
│
▼
Return Result
|
流式消息处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| Client Request
│
▼
┌──────────────────┐
│SendMessageStream │
└─────────┬────────┘
│
▼
┌──────────────────┐
│ Lock Task │
│ Reset Queue │
└─────────┬────────┘
│
├──────────────────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Async Execution │ │ Stream Reader │
│ │ │ (Pop Queue) │
│ ┌──────────────┐ │ │ │
│ │ Handler │ │ │ ┌────────────┐ │
│ │ Execute │─┼──┬───┼─►│ Send Event │─┼──► Client
│ └──────────────┘ │ │ │ └────────────┘ │
│ │ │ │ │
│ ┌──────────────┐ │ │ └──────────────────┘
│ │ Push to Queue│◄┼──┘
│ └──────────────┘ │
│ │
│ ┌──────────────┐ │
│ │ Consolidate │ │
│ └──────────────┘ │
│ │
│ ┌──────────────┐ │
│ │Update & Save │ │
│ └──────────────┘ │
│ │
│ ┌──────────────┐ │
│ │Unlock & Close│ │
│ └──────────────┘ │
└──────────────────┘
|
关键点:
- 异步执行:Handler 在独立的 goroutine 中执行
- 事件队列:通过队列实现生产者-消费者模式
- 流式传输:客户端通过 SSE (Server-Sent Events) 实时接收事件
- 任务锁:保证任务处理的并发安全
- 错误恢复:支持 panic 捕获和错误传播
TaskStore(任务存储)
1
2
3
4
| type TaskStore interface {
Get(ctx context.Context, taskID string) (*models.Task, bool, error)
Save(ctx context.Context, task *models.Task) error
}
|
实现方式:
- 默认:内存存储(
inMemoryTaskStore) - 可扩展:Redis、数据库等持久化存储
TaskLocker(任务锁)
1
2
3
4
| type TaskLocker interface {
Lock(ctx context.Context, taskID string) error
Unlock(ctx context.Context, taskID string) error
}
|
作用:
EventQueue(事件队列)
1
2
3
4
5
6
7
8
9
10
11
12
| type EventQueue interface {
Reset(ctx context.Context, taskID string) error
Push(ctx context.Context, taskID string,
event *models.SendMessageStreamingResponseUnion,
err error) error
Pop(ctx context.Context, taskID string) (
event *models.SendMessageStreamingResponseUnion,
err error,
closed bool,
popErr error)
Close(ctx context.Context, taskID string) error
}
|
实现:
- 基于 channel 的内存队列
- 支持多个任务的并发队列管理
- 每个任务有独立的事件队列
PushNotifier(推送通知)
1
2
3
4
5
6
7
| type PushNotifier interface {
Set(ctx context.Context, config *models.TaskPushNotificationConfig) error
Get(ctx context.Context, configID string) (
models.PushNotificationConfig, bool, error)
SendNotification(ctx context.Context,
event *models.SendMessageStreamingResponseUnion) error
}
|
使用场景:
- 长时间运行的任务
- 异步通知客户端
- Webhook 集成
服务端使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
| import (
"github.com/cloudwego/eino-ext/a2a/server"
"github.com/cloudwego/eino-ext/a2a/transport/jsonrpc"
)
func main() {
ctx := context.Background()
// 1. 创建 Hertz HTTP 服务器
hz := hertz_server.Default()
// 2. 创建 JSON-RPC 注册器
registrar, _ := jsonrpc.NewRegistrar(ctx, &jsonrpc.ServerConfig{
Router: hz,
HandlerPath: "/a2a",
})
// 3. 注册 A2A 处理器
server.RegisterHandlers(ctx, registrar, &server.Config{
AgentCardConfig: server.AgentCardConfig{
Name: "My Agent",
Description: "A helpful AI agent",
URL: "<https://example.com/a2a>",
Version: "1.0.0",
},
// 流式消息处理器
MessageStreamingHandler: func(ctx context.Context,
params *server.InputParams,
writer server.ResponseEventWriter) error {
// 处理用户输入
userInput := params.Input
// 发送状态更新
writer.Write(models.ResponseEvent{
TaskStatusUpdateEventContent: &models.TaskStatusUpdateEventContent{
Status: models.TaskStatus{
State: models.TaskStateWorking,
},
},
})
// 生成输出
result := processInput(userInput)
// 发送结果
writer.Write(models.ResponseEvent{
TaskArtifactUpdateEventContent: &models.TaskArtifactUpdateEventContent{
Artifact: models.Artifact{
Parts: []models.Part{
{Kind: models.PartKindText, Text: &result},
},
},
LastChunk: true,
},
})
return nil
},
// 任务取消处理器
CancelTaskHandler: func(ctx context.Context,
params *server.InputParams) (*models.TaskContent, error) {
return &models.TaskContent{
Status: models.TaskStatus{State: models.TaskStateCanceled},
}, nil
},
// 事件合并器
TaskEventsConsolidator: consolidateEvents,
})
hz.Run()
}
|
Client端实现详解
#
Client 架构
1
2
3
| type A2AClient struct {
cli transport.ClientTransport
}
|
主要方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| // 获取 Agent 信息
func (c *A2AClient) AgentCard(ctx context.Context) (*models.AgentCard, error)
// 发送消息(同步)
func (c *A2AClient) SendMessage(ctx context.Context,
params *models.MessageSendParams) (*models.SendMessageResponseUnion, error)
// 发送消息(流式)
func (c *A2AClient) SendMessageStreaming(ctx context.Context,
params *models.MessageSendParams) (*ServerStreamingWrapper, error)
// 获取任务状态
func (c *A2AClient) GetTask(ctx context.Context,
params *models.TaskQueryParams) (*models.Task, error)
// 取消任务
func (c *A2AClient) CancelTask(ctx context.Context,
params *models.TaskIDParams) (*models.Task, error)
// 重新订阅任务(断线重连)
func (c *A2AClient) ResubscribeTask(ctx context.Context,
params *models.TaskIDParams) (*ServerStreamingWrapper, error)
|
客户端使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| import (
"github.com/cloudwego/eino-ext/a2a/client"
"github.com/cloudwego/eino-ext/a2a/transport/jsonrpc"
)
func main() {
ctx := context.Background()
// 1. 创建传输层
transport, _ := jsonrpc.NewTransport(ctx, &jsonrpc.ClientConfig{
BaseURL: "<http://localhost:8080>",
HandlerPath: "/a2a",
})
// 2. 创建客户端
cli, _ := client.NewA2AClient(ctx, &client.Config{
Transport: transport,
})
// 3. 获取 Agent 信息
card, _ := cli.AgentCard(ctx)
fmt.Printf("Agent: %s\\n", card.Name)
// 4. 发送流式消息
stream, _ := cli.SendMessageStreaming(ctx, &models.MessageSendParams{
Message: models.Message{
Role: models.RoleUser,
Parts: []models.Part{
{Kind: models.PartKindText, Text: ptr("Hello, agent!")},
},
},
})
// 5. 接收流式响应
for {
event, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
// 处理事件
if event.TaskStatusUpdateEvent != nil {
fmt.Printf("Status: %s\\n", event.TaskStatusUpdateEvent.Status.State)
}
if event.TaskArtifactUpdateEvent != nil {
fmt.Printf("Artifact: %v\\n", event.TaskArtifactUpdateEvent.Artifact)
}
}
}
|
Eino 集成
#
Eino Server 集成:将 Eino ADK Agent 包装为 A2A 服务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| import (
"github.com/cloudwego/eino/adk"
einoa2a "github.com/cloudwego/eino-ext/a2a/extension/eino"
)
func main() {
ctx := context.Background()
// 1. 创建 Eino Agent
agent := createMyEinoAgent()
// 2. 创建 JSON-RPC 注册器
registrar, _ := jsonrpc.NewRegistrar(ctx, &jsonrpc.ServerConfig{
Router: hertz_server.Default(),
HandlerPath: "/agent",
})
// 3. 注册为 A2A 服务
einoa2a.RegisterServerHandlers(ctx, agent, &einoa2a.ServerConfig{
Registrar: registrar,
// Agent 运行选项转换器
AgentRunOptionConvertor: func(ctx context.Context,
t *models.Task,
input *models.Message,
metadata map[string]any) ([]adk.AgentRunOption, error) {
// 从 A2A Message 转换为 ADK 运行选项
return []adk.AgentRunOption{}, nil
},
// Checkpoint 存储(支持中断恢复)
CheckPointStore: myCheckpointStore,
// 历史消息转换器
HistoryMessageConvertor: func(ctx context.Context,
messages []*models.Message) ([]adk.Message, error) {
// 从 A2A Messages 转换为 ADK Messages
return convertMessages(messages), nil
},
// 恢复选项转换器(用于中断后恢复)
ResumeConvertor: func(ctx context.Context,
t *models.Task,
input *models.Message,
metadata map[string]any) ([]adk.AgentRunOption, error) {
return []adk.AgentRunOption{}, nil
},
})
}
|
事件转换流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| ADK AgentEvent → A2A ResponseEvent
┌──────────────────────┐
│ AgentEvent │
├──────────────────────┤
│ - Action │─┐
│ - Interrupted │ │ ┌──────────────────────┐
│ - TransferToAgent │─┼───►│ TaskStatusUpdate │
│ - Output │ │ │ - State │
│ - MessageOutput │─┘ │ - Message │
└──────────────────────┘ └──────────────────────┘
│
└──────────────────► ┌──────────────────────┐
│ TaskArtifactUpdate │
│ - Artifact │
│ - LastChunk │
└──────────────────────┘
|
Eino Client 集成:将远程 A2A 服务包装为 Eino Agent。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| import (
einoa2a "github.com/cloudwego/eino-ext/a2a/extension/eino"
)
func main() {
ctx := context.Background()
// 1. 创建 A2A 传输层
transport, _ := jsonrpc.NewTransport(ctx, &jsonrpc.ClientConfig{
BaseURL: "<http://remote-agent:8080>",
HandlerPath: "/agent",
})
// 2. 创建 Eino Agent(包装 A2A Client)
agent, _ := einoa2a.NewAgent(ctx, einoa2a.AgentConfig{
Transport: transport,
// 可选:自定义输入转换
InputMessageConvertor: func(ctx context.Context,
messages []*schema.Message) (models.Message, error) {
return convertToA2AMessage(messages), nil
},
// 可选:自定义输出转换
OutputConvertor: func(ctx context.Context,
receiver *einoa2a.ResponseUnionReceiver,
sender *einoa2a.AgentEventSender) {
// 自定义从 A2A 响应到 ADK 事件的转换逻辑
},
})
// 3. 像使用普通 Eino Agent 一样使用
runner := adk.NewRunner(ctx, adk.RunnerConfig{
Agent: agent,
})
iter := runner.Run(ctx, []adk.Message{
schema.UserMessage("Hello!"),
})
// 处理结果
for {
event, ok := iter.Next()
if !ok {
break
}
handleEvent(event)
}
}
|
中断与恢复支持:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // Agent 执行过程中发生中断
iter := runner.Run(ctx, input)
for {
event, ok := iter.Next()
if !ok {
break
}
// 检测到中断
if event.Action != nil && event.Action.Interrupted != nil {
interruptInfo := event.Action.Interrupted
// 保存中断信息(自动保存在 CheckPointStore)
fmt.Printf("Agent interrupted: %v\\n", interruptInfo.Data)
// ... 等待用户输入 ...
// 恢复执行
resumeIter, _ := runner.Resume(ctx, interruptInfo.CheckPointID,
einoa2a.WithResumeMessages(userResponse))
// 继续处理
}
}
|
传输层实现
#
Transport 接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // 客户端传输接口
type ClientTransport interface {
AgentCard(ctx context.Context) (*models.AgentCard, error)
SendMessage(ctx context.Context, params *models.MessageSendParams)
(*models.SendMessageResponseUnion, error)
SendMessageStreaming(ctx context.Context, params *models.MessageSendParams)
(models.ResponseReader, error)
GetTask(ctx context.Context, params *models.TaskQueryParams)
(*models.Task, error)
CancelTask(ctx context.Context, params *models.TaskIDParams)
(*models.Task, error)
ResubscribeTask(ctx context.Context, params *models.TaskIDParams)
(models.ResponseReader, error)
Close() error
}
// 服务端注册接口
type HandlerRegistrar interface {
Register(context.Context, *models.ServerHandlers) error
}
|
JSON-RPC 实现:
目前支持的传输协议是 JSON-RPC over HTTP/HTTPS。
特点:
- 基于 CloudWeGo Hertz HTTP 框架
- 支持 SSE (Server-Sent Events) 流式传输
- 自定义 JSON-RPC 2.0 协议实现
- 支持元数据传递和中间件
核心组件:
1
2
3
4
5
6
7
8
9
10
11
12
| transport/jsonrpc/
├── core/
│ ├── jsonrpc.go # JSON-RPC 协议核心
│ ├── connection.go # 连接管理
│ ├── message.go # 消息编解码
│ └── middleware.go # 中间件支持
├── client/
│ ├── client.go # HTTP 客户端
│ └── option.go # 配置选项
└── server/
├── server.go # HTTP 服务端
└── option.go # 配置选项
|
消息格式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| // Request
{
"jsonrpc": "2.0",
"id": "req-123",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "Hello"}]
}
}
}
// Response
{
"jsonrpc": "2.0",
"id": "req-123",
"result": {
"task": {
"id": "task-456",
"status": {"state": "completed"},
...
}
}
}
// Stream Event (SSE format)
data: {"message": {...}}
data: {"taskStatusUpdateEvent": {"status": {"state": "working"}}}
data: {"taskArtifactUpdateEvent": {"artifact": {...}, "lastChunk": true}}
|
引用: