5.6 compose编排 #
**compose /kəm’poʊz/**vt. 组成
compose遇到难点:
1.** Go 理论基础不够扎实 → 我会从基础概念开始**
2. Go 代码开发实践太少 → 提供大量代码示例和练习
3.** AI 概念太多 → 用通俗易懂的方式解释 AI 相关概念**
**4. **工程应用框架设计没接触过 → 从设计模式角度解释
一、compose包整体设计 #
传统代码开发过程中,用 代码的执行逻辑 来表达业务逻辑,直接迁移到大模型应用开发中时结果是:代码杂乱、很难复用、没有切面能力…
Eino 的初衷是让大模型应用开发变得非常简单,就一定要让应用的代码逻辑 简单、直观、优雅、健壮。
Eino对compose的洞察:
- compose要在components业务逻辑之上独立出清晰的一个逻辑层,不能让业务逻辑混入到编排中:业务逻辑复杂度封装到components内部,上层的compose层拥有更全局的视角。
- 大模型应用的核心是 compose components(编排组件),components是compose的 “第一公民”。数据在graph中流动,每个上下游node对流动的数据类型有对齐的要求。
- 业务场景的复杂度会直接映射到编排产物的复杂度上,只有**横向的切面治理能力(callback机制)**才能让复杂场景不至于失控。
- 大模型保持高速发展,大模型应用也是,只有具备扩展能力(calloption机制)的应用才拥有生命力。扩展性是快速迭代中的系统最基本的诉求。
**Eino compose**:基于Graph模型(node+edge)compose components + 上下游node对流动的数据类型对齐为基础 的compose解决方案**。**
compose包架构 #
graph TB
subgraph "编排层 (Orchestration Layer)"
Graph["Graph<br/>复杂图编排"]
Chain["Chain<br/>链式编排"]
Workflow["Workflow<br/>工作流编排"]
end
subgraph "执行层 (Execution Layer)"
Runnable["Runnable<br/>执行抽象"]
GraphRun["GraphRun<br/>图执行引擎"]
DAG["DAG<br/>有向无环图"]
Pregel["Pregel<br/>大规模图处理"]
end
subgraph "数据层 (Data Layer)"
FieldMapping["FieldMapping<br/>字段映射"]
StreamConcat["StreamConcat<br/>流连接"]
ValuesMerge["ValuesMerge<br/>值合并"]
end
subgraph "状态层 (State Layer)"
State["State<br/>状态管理"]
Checkpoint["Checkpoint<br/>检查点"]
Interrupt["Interrupt<br/>中断处理"]
end
subgraph "组件层 (Component Layer)"
ToolNode["ToolNode<br/>工具节点"]
Lambda["Lambda<br/>函数支持"]
ComponentAdapter["ComponentAdapter<br/>组件适配"]
end
Graph --> GraphRun
Chain --> GraphRun
Workflow --> GraphRun
GraphRun --> Runnable
GraphRun --> DAG
GraphRun --> Pregel
Workflow --> FieldMapping
GraphRun --> StreamConcat
GraphRun --> ValuesMerge
GraphRun --> State
GraphRun --> Checkpoint
GraphRun --> Interrupt
GraphRun --> ToolNode
GraphRun --> Lambda
GraphRun --> ComponentAdapter
上下游类型对齐设计 #
Eino对每个node的input、output使用明确的类型,使用any、map[string]any将导致下游node需要额外使用类型断言来不断猜测传过来的值是否符合预期。

模拟RAG模式:

graph.AddXXXNode(node_key, xxx, compose.WithOutputKey("outkey"): 类型转化为 map[string]any。其中map 的 key 是 option 中指定的 OutputKey。一般用于多条边汇聚到某一个节点时。
graph.AddXXXNode(node_key, xxx, compose.WithInputKey("inkey"):获取上游输出的其中一个 key 的 value。
模拟react agent的运行逻辑:condition分支(菱形)

模拟chain agent运行逻辑:

若上下游的类型没有对齐,chain 会在 chain.Compile() 时返回错误。而 graph 会在 graph.AddXXXNode() 时就报错。
二、执行引擎 #
| |
Runnable接口 #
定义了框架中所有可执行对象的基础接口:
| |
提供了四种数据流模式的自动转换机制(I为输入类型,O为输出类型):
| 入参 | 出参 | func | example |
|---|---|---|---|
| 非流式 | 非流式 | Invoke(ctx context.Context, input I) (O, error) | 用户问问题,机器人一次性回答 |
| 非流式 | 流式 | Stream(ctx context.Context, input I) (*StreamReader[O], error) | 用户问问题,机器人逐字逐句回答(像 ChatGPT 那样) |
| 流式 | 非流式 | Collect(ctx context.Context, input *StreamReader[I]) (O, error) | 用户连续发送多条消息,机器人汇总回答 |
| 流式 | 流式 | Transform(ctx context.Context, input *StreamReader[I]) (*StreamReader[O], error) | 用户发送语音流,机器人实时转换为文字流 |
composableRunnable:
- 类型擦除:使用any类型避免泛型复杂性
- 反射支持:保留类型信息用于运行时验证
- 元数据丰富:包含执行器信息和图节点上下文
| |
runnablePacker:
- 包装用户提供的四个执行函数
- 提供类型安全的执行接口
- 支持上下文包装和回调集成
| |
newRunnablePacker:
- 最小依赖:优先使用用户直接实现的方法
- 性能优化:避免不必要的转换开销
- 完整性保证:确保所有四个方法都可用
graph TD A[检查用户实现] --> B{有Invoke?} B -->|是| C[直接使用Invoke] B -->|否| D{有Stream?} D -->|是| E[invokeByStream] D -->|否| F{有Collect?} F -->|是| G[invokeByCollect] F -->|否| H[invokeByTransform] C --> I[合成其他方法] E --> I G --> I H --> I
流处理集成:
StreamReader处理
流转换:框架提供了12个转换函数,实现任意执行模式之间的转换:
| |
图系统集成:
- 节点包装
| |
- 透传节点
| |
设计模式:
**适配器模式:**Runnable接口作为适配器,统一了不同组件的执行接口:
graph LR
A[ChatModel] --> D[Runnable接口]
B[ToolsNode] --> D
C[Lambda] --> D
D --> E[统一执行]
**策略模式:**四种执行模式作为不同的策略,根据数据流需求选择
graph TD
A[数据流需求] --> B{输入类型}
B -->|单个值| C{输出类型}
B -->|流| D{输出类型}
C -->|单个值| E[Invoke策略]
C -->|流| F[Stream策略]
D -->|单个值| G[Collect策略]
D -->|流| H[Transform策略]
**装饰器模式:**composableRunnable作为装饰器,为原始组件添加类型安全、错误处理等能力。
扩展性设计:
- **插件化架构:**通过Option系统支持运行时配置和扩展。
- **回调集成:**支持执行前后的回调处理,便于监控和调试。
- **图系统集成:**无缝集成到图执行引擎中,支持复杂的编排场景。
types基础类型定义 #
- **核心组件类型定义:**最原始的可执行对象类型
| |
- 节点触发模式
| |
- AnyGraph接口:统一Graph、Chain、Workflow的编译接口
| |
AnyGraph 接口体现了 Eino 框架的核心设计哲学:
- 编译时类型安全: 通过 inputType() 和 outputType() 方法,框架可以在编译时验证类型兼容性
- 统一编译接口: 所有图结构都通过 compile() 方法转换为 composableRunnable
- 组件类型识别: component() 方法允许运行时识别具体的组件类型
泛型辅助器的作用:
getGenericHelper() 返回的 *genericHelper 是类型系统的核心:
graph TB
subgraph "泛型辅助器功能"
TypeConversion["类型转换"]
TypeValidation["类型验证"]
TypeInference["类型推断"]
GenericOperations["泛型操作"]
end
subgraph "AnyGraph 实现"
Graph["Graph[I,O]"]
Chain["Chain[I,O]"]
Workflow["Workflow[I,O]"]
end
Graph --> TypeConversion
Chain --> TypeValidation
Workflow --> TypeInference
TypeConversion --> GenericOperations
TypeValidation --> GenericOperations
TypeInference --> GenericOperations
三、核心3种编排模式 #
| |

Chain(链式) #
Eino中的Chain是对 Graph 的降级封装。
| |
example:
| |
Graph图编排 #
像流程图一样,可以有分支、循环、并行.
example:
| |
Workflow工作流编排 #
Eino: Workflow 编排框架:
专注于数据转换,精确控制每个字段的映射
| |
Chain/Graph 编排 #
https://www.cloudwego.io/zh/docs/eino/core_modules/chain_and_graph_orchestration/chain_graph_introduction/#chain Eino编排的设计理念: https://www.cloudwego.io/zh/docs/eino/core_modules/chain_and_graph_orchestration/orchestration_design_principles/
四、分支和并行 #
| |
五、数据流处理 #
| |
六、Lambda和组件 #
| |
七、状态管理 #
| |
八、配置选项 #
| |
Eino 流式编程 #
Eino 流式编程要点: https://www.cloudwego.io/zh/docs/eino/core_modules/chain_and_graph_orchestration/stream_programming_essentials/
