添加节点#
节点类型#
EvoFabric 的图引擎提供四种类型的节点,分别支持同步、异步及其对应的流式处理模式。不同节点通过继承相应的基类并实现 __call__ 方法来定义处理逻辑,需要流式输出的节点可利用 StreamWriter 实现流式输出。
备注
EvoFabric 支持直接添加普通的 Python 函数作为节点。
系统会根据函数签名自动识别其类型并封装为合适的节点实例。
详情参考:callable_to_node()
节点类型 |
继承类 |
方法定义 |
处理逻辑 |
流式输出 |
|---|---|---|---|---|
同步节点 |
|
在方法中实现节点的同步处理逻辑 |
不支持 |
|
异步节点 |
|
在方法中实现节点的异步处理逻辑 |
不支持 |
|
同步流式节点 |
|
在方法中实现节点的同步处理逻辑,并通过 |
支持 |
|
异步流式节点 |
|
在方法中实现节点的异步处理逻辑,并通过 |
支持 |
添加节点#
通过 add_node() 方法在图中添加节点,同时可以指定节点名、行为模式以及多输入的合并策略(可选)。
每个节点代表一个独立的计算单元或逻辑步骤,可为同步、异步或流式节点。
from evofabric.core.graph import GraphBuilder, AsyncNode
from evofabric.core.typing import State, StateDelta
class AnalyzeNode(AsyncNode):
async def __call__(self, state: State) -> StateDelta:
# add your code here
return {"result": f"analyzed: {state.text}"}
graph = GraphBuilder(state_schema=State)
graph.add_node(
name="analyze",
node=AnalyzeNode(),
action_mode="any",
multi_input_merge_strategy={"default": lambda states: states[0]}
)
参数说明:
name:节点名称,必须唯一。 不可使用系统保留名称start与end。node:节点实例,可为四种节点类型之一(同步、异步、同步流式、异步流式), 或普通 Python 函数(系统会自动包装为对应节点类型)。action_mode:节点的触发模式,控制前驱节点完成后何时执行。"any":任意一个前驱节点执行完成即触发。"all":同组别的所有前驱节点执行完成后触发。
multi_input_merge_strategy:多前驱状态合并策略。 当节点存在来自多个组别的输入时,可为不同组别指定合并函数。 该参数为一个字典,key为边的group名,value为Callable[[List[State]], State]类型的函数。若未指定此参数,则系统使用默认的状态更新机制依次合并输入状态。
参见
组别的定义和使用方式详见 添加边
使用示例:
# When node has multiple predecessor graph.add_node( name="merge_result", node=lambda s: {"final": s["a"] + s["b"]}, action_mode="all", multi_input_merge_strategy={ "group_a": lambda states: states[0], "group_b": lambda states: states[-1] } )
运行机制:
每个节点接收一个完整的 State 输入,执行计算逻辑后返回字典类型的状态增量(StateDelta)。
引擎会将增量自动合并到全局状态中,并按定义的边结构继续向下传递。