evofabric.core.graph#
Graph Builder#
- class evofabric.core.graph.GraphBuilder(BaseComponent)[源代码]#
基于状态驱动的图构建器,用于编排节点与边的执行逻辑,继承自
BaseComponent。- 参数:
state_schema (type[StateSchema]) -- 本图所使用的状态模式,可为
dict或pydantic.BaseModel子类。
- add_node(self, name: str, node: Callable | NodeBase, action_mode: NodeActionMode | str = 'any', multi_input_merge_strategy: dict[str, Callable[[List[State]], State]] = None)[源代码]#
向图中添加一个节点。
- 参数:
name (str) -- 节点名称,需在图内唯一。不可以使用
start``和``end,这两个节点是图引擎预留的特殊节点。node (Union[Callable, NodeBase]) -- 节点执行体,可以是普通 Python 函数或
NodeBase子类实例。action_mode (Union[NodeActionMode, str]) -- 节点触发策略。默认是
any"all"表示所有前驱节点执行完毕后才触发;"any"表示任意前驱节点执行完毕即触发。multi_input_merge_strategy (dict[str, Callable[[List[State]], State]]) -- 按通道名指定多前驱状态合并策略, key为通道名,值为
Callable[[List[State] -> State]]的可调用对象。
- add_edge(self, source: str, target: str, group: str = 'all', state_filter: Callable[[State], State] | None = None)[源代码]#
在两个节点之间建立一条普通边。
- 参数:
source (str) -- 源节点名称。
target (str) -- 目标节点名称。
group (str) -- 边所属组名,用于批量控制或可视化分组。
state_filter (Optional[Callable[[State], State]]) -- 可选的状态过滤函数,对流经该边的状态进行拦截与改写。
- add_condition_edge(self, source: str, router: Callable, possible_targets: List[str] | Set[str], group: str = 'all')[源代码]#
为指定源节点添加条件路由边。
router支持解析四种类型的返回结果:单个
str:下一跳目标节点名;List[str]:多个下一跳目标节点名;Tuple[str, Callable]:目标节点名 + 该边专用状态过滤函数;List[Tuple[str, Callable]]:多组(目标节点名, 状态过滤函数)。
- 参数:
source (str) -- 源节点名称。
router (Callable) --
路由函数,按运行时的状态决定下一跳,输入是源节点输出的完整State状态。
备注
注意:每个节点的输出是state的增量信息。但router函数的输入是:该节点输出的增量信息和输入state合并后的完整state。
possible_targets (Union[List[str], Set[str]]) -- 所有可能被路由到的目标节点名,用于校验与优化。
group (str) -- 边所属组名。
- set_entry_point(self, entry_name: str)[源代码]#
设置图的唯一入口节点。
- 参数:
entry_name (str) -- 入口节点名称,需已通过
add_node添加。
- build(self, auto_conn_end: bool = True, max_turn: int = None, timeout: int = None, graph_mode: GraphMode = GraphMode.RUN, db_file_path: str = './.storage.db', db_name: str = 'evofabric')[源代码]#
根据已添加的节点与边构建出可执行图实例。
- 参数:
auto_conn_end (bool) -- 若为
True,自动为所有无后继的节点连接至内置 END 节点。max_turn (int) -- 图最大运行轮数,防止无限循环。
timeout (int) -- 图运行总超时时间(秒)。
graph_mode (GraphMode | str) -- 图运行模式,默认
run表示执行模式,debug表示调试模式。db_file_path (str) -- 持久化数据库文件路径,用于在debug模式下存储图状态快照。
db_name (str) -- 数据库名称。
- 返回:
可执行的
GraphEngine或GraphEngineDebugger图引擎实例
- dumps(self, graph_name: str = 'graph', version: str = '1.0') dict[源代码]#
将当前图构建器中的节点与边序列化为字典配置。
- 参数:
graph_name (str) -- 图名称,默认为 "graph"。
version (str) -- 配置版本号,默认为 "1.0"。
- 返回:
包含图名称、版本、状态模式、节点、边及是否设置入口点的字典。
- 返回类型:
dict
- dump(self, save_path: str, graph_name: str = 'graph', version: str = '1.0')[源代码]#
将当前图构建器中的内容序列化并保存到指定文件。
- 参数:
save_path (str) -- 保存路径。
graph_name (str) -- 图名称,默认为 "graph"。
version (str) -- 配置版本号,默认为 "1.0"。
- load(cls, file_path: str) 'GraphBuilder'[源代码]#
从文件中加载已构建的图构建器。
- 参数:
file_path (str) -- 文件路径。
- 返回:
加载后的 GraphBuilder 实例。
- 返回类型:
Graph Engine#
- class evofabric.core.graph.GraphEngine(BaseComponent)[源代码]#
图执行引擎,负责解析并驱动整个图(节点+边)的运行,支持状态检查点、最大轮次与超时控制。
- 参数:
nodes (Dict[str, GraphNodeSpec]) -- 图中所有节点的规格映射,键为节点名,值为
GraphNodeSpec对象。edges (Dict[str, List[EdgeSpecBase]]) -- 图中所有边的规格映射,键为源节点名,值为该节点出发的
EdgeSpecBase列表。state_schema (Optional[SkipValidation[type[StateSchema]]]) -- 状态模式类型,用于运行时状态校验;为
None时不做校验。max_turn (Optional[int]) -- 允许的最大节点调用次数;超过即强制终止已运行的图,但已到达 END 节点的输出仍保留;为
None表示无限制。timeout (Optional[int]) -- 单个节点执行的超时时间(秒);为
None表示不限制。
- class evofabric.core.graph.RunTimeTask(BaseModel)[源代码]#
用于描述在图执行过程中需要被调度的节点任务及其相关状态与过滤条件。
- 参数:
node_name (str) -- 目标节点名称。
state_ckpt (Union[StateCkpt, List[StateCkpt]]) -- 当前任务对应的
StateCkpt;支持单个StateCkpt或列表。edge_group (str) -- 当前任务所属边的分组名称,用于在并行或条件分支中区分不同边集合。
predecessor (Optional[Union[str, List[str]]]) -- 前驱节点名称;支持单个节点名或节点名列表,为
None时表示无前驱节点。state_filter (Optional[Callable]) -- 应用于边的 状态过滤函数,签名需满足
Callable[[Any], bool];为None时不进行额外过滤。trace_route (List[str]) -- 已执行的节点轨迹,按顺序记录沿途经过的节点名;默认空列表。
Graph Engine Debugger#
- class evofabric.core.graph.GraphEngineDebugger(GraphEngine)[源代码]#
可调试版本的
GraphEngine,支持断点、步进执行、状态恢复及节点输出修改。- 参数:
db_file_path (str) -- 用于状态存储的数据库文件路径。默认值
".state_storage.db"db_name (str) -- 数据库名称。默认值
"graph state"
- set_breakpoint(node_name_bp: str | None = None, condition_bp: Any = None, condition: Any = None) None[源代码]#
在指定节点上设置断点。
- 参数:
node_name_bp (Optional[str]) -- 要设置断点的节点名称。
condition_bp (Any) -- 断点条件(暂不支持)。
condition (Any) -- 断点条件(暂不支持)。
- 抛出:
RuntimeError -- 当
node_name_bp为None时抛出。
- clear_breakpoint(node_name_bp: str | None = None, condition_bp: Any = None, condition: Any = None) None[源代码]#
清除指定节点的断点。
- 参数:
node_name_bp (Optional[str]) -- 要清除断点的节点名称。
condition_bp (Any) -- 断点条件(暂不支持)。
condition (Any) -- 断点条件(暂不支持)。
- 抛出:
RuntimeError -- 当
node_name_bp为None时抛出。
- async resume(running_queue: List[RunTimeTask] | None = None) Awaitable[Any][源代码]#
从当前断点或指定的
RuntimeTask队列继续执行。- 参数:
running_queue (Optional[List[RunTimeTask]]) -- 待恢复任务队列;若为
None,则从非断点的叶节点恢复。- 返回:
一步执行结果。
- 返回类型:
Any
- 抛出:
RuntimeError -- 当图引擎已经启动时抛出。
- async step_over(node_uuid: str | None = None) Awaitable[Any][源代码]#
跳过当前断点或指定节点。
- 参数:
node_uuid (Optional[str]) -- 要跳过的节点UUID;若为
None,则跳过所有当前节点。- 返回:
一步执行结果。
- 返回类型:
Any
- restore_step(node_uuid: str | None = None) None[源代码]#
恢复上一步执行操作。
- 参数:
node_uuid (Optional[str]) -- 要恢复的节点UUID;若为
None,则恢复上一步操作。
- async run_one_step(running_queue: List[RunTimeTask]) Awaitable[Tuple[Any, List[RunTimeTask]]][源代码]#
执行图中的一个步骤。
- 参数:
running_queue (List[RunTimeTask]) -- 待执行任务队列。
- 返回:
输出结果与下一批候选任务节点。
- 返回类型:
Tuple[Any, List[RunTimeTask]]
- async debug(inputs: Dict) Awaitable[Any][源代码]#
启动调试会话并注入初始输入。
- 参数:
inputs (Dict) -- 初始状态字典。
- 返回:
调试完成后的最终结果。
- 返回类型:
Any
- change_output(from_node_uuid: str, to_node_uuid: str, change_key: str, change_value: Any) None[源代码]#
在节点间修改输出值。
- 参数:
from_node_uuid (str) -- 源节点UUID。
to_node_uuid (str) -- 目标节点UUID。
change_key (str) -- 要修改的键。
change_value (Any) -- 新的键值。
- 抛出:
RuntimeError -- 当目标节点不是当前叶节点时抛出。
示例
from typing import Annotated, List, TypedDict from pydantic import BaseModel from evofabric.core.graph import GraphBuilder from evofabric.core.typing import * from evofabric.logger import get_logger class State(TypedDict): messages: Annotated[List, "append_messages"] node_output: Annotated[str, "overwrite"] def node_a(state): return {"messages": [AssistantMessage(content="node a output")], 'node_output': 'a'} def node_b(state): self.assertIsInstance(state, dict) self.assertIsInstance(state['messages'][0], StateMessage) return {"messages": [AssistantMessage(content="node b output")], 'node_output': 'b'} def node_c(state): self.assertIsInstance(state, dict) self.assertIsInstance(state['messages'][0], StateMessage) return {"messages": [AssistantMessage(content="node c output")], 'node_output': 'c'} def node_d(state): self.assertIsInstance(state, dict) self.assertIsInstance(state['messages'][0], StateMessage) return {"messages": [AssistantMessage(content="node d output")], 'node_output': 'd'} graph_builder = GraphBuilder(state_schema=State) graph_builder.add_node("a", node_a) graph_builder.add_node("b", node_b) graph_builder.add_node("c", node_c) graph_builder.add_node("d", node_d) graph_builder.add_edge("a", "b") graph_builder.add_edge("b", "c") graph_builder.add_edge("c", "d") graph_builder.set_entry_point("a") graph = graph_builder.build(graph_mode=GraphMode.DEBUG, db_file_path="D:/Download/.db_storage.db") # debug mode example graph.set_breakpoint(node_name_bp="b") await graph.debug({"messages": [UserMessage(content='hello')]}) await graph.step_over(node_uuid=graph._trace_tree.get_leaf_node_uuid_by_node_name(node_name="b")) graph.restore_step() graph.clear_breakpoint(node_name_bp="b") result = await graph.resume()
Node#
- class evofabric.core.graph.NodeBase(BaseComponent)[源代码]#
所有节点的抽象基类,继承自
BaseComponent。 子类需实现__call__协议,以定义节点在被调用时的具体行为。
- class evofabric.core.graph.SyncStreamNode(NodeBase)[源代码]#
支持同步流式输出的节点,继承自
NodeBase。 在调用过程中可通过stream_writer实时向外发送流式数据块。- __call__(self, state: State, stream_writer: StreamWriter) StateDelta[源代码]#
同步流式执行节点逻辑。
- 参数:
state (State) -- 当前工作流状态。
stream_writer (StreamWriter) -- 用于实时输出流式数据的写入器。
- 返回:
相对于当前状态的变化量。
- 返回类型:
StateDelta
- class evofabric.core.graph.AsyncStreamNode(NodeBase)[源代码]#
支持异步流式输出的节点,继承自
NodeBase。 在调用过程中可通过stream_writer实时向外发送流式数据块。- async __call__(self, state: State, stream_writer: StreamWriter) StateDelta[源代码]#
异步流式执行节点逻辑。
- 参数:
state (State) -- 当前工作流状态。
stream_writer (StreamWriter) -- 用于实时输出流式数据的写入器。
- 返回:
相对于当前状态的变化量。
- 返回类型:
StateDelta
- class evofabric.core.graph.GraphNodeSpec(BaseComponent)[源代码]#
该节点在图引擎中使用,会首先将图运行信息注入到上下文中,随后根据不同节点类型决定调用方式并分发不同入参。继承自
BaseComponent。- 参数:
node_name (str) -- 节点在图中的可读名称。
action_mode (NodeActionMode) -- 节点执行模式,默认
NodeActionMode.ALL。stream_writer (Optional[StreamWriter]) -- 可选的流式写手指针,用于把节点中间结果实时推送出去。
multi_input_merge_strategy (Optional[Dict[str, Callable[[List[State]], State]]]) -- 当节点存在多路输入时,按 key 指定自定义合并函数
Callable[[List[State]], State];若未提供则使用默认策略。node_id (str) -- 节点全局唯一标识,默认自动生成 UUID。
- is_active(self) bool#
@property类型。当前节点是否处于激活状态。- 返回:
激活返回
True,否则False。- 返回类型:
bool
- async __call__(self, state: State, **kwargs) StateDelta[源代码]#
通过
evofabric.core.graph.stream_writer_env()注入节点执行信息,并在异步锁保护下执行节点逻辑。- 参数:
state (State) -- 输入状态对象。
- 返回:
节点执行后产生的状态增量。
- 返回类型:
StateDelta
- evofabric.core.graph.callable_to_node(callable_obj: Callable[..., Any]) NodeBase[源代码]#
将任意可调用对象(函数或实现了
__call__方法的类实例)自动转换为对应的节点类型实例。函数会根据目标对象的定义特征自动判断其类型:
若为 异步函数 且带有
stream_writer参数 → 转换为异步流式节点。若为 异步函数 且不带
stream_writer参数 → 转换为异步普通节点。若为 同步函数 且带有
stream_writer参数 → 转换为同步流式节点。若为 同步函数 且不带
stream_writer参数 → 转换为同步普通节点。
Edge#
- class evofabric.core.graph.EdgeSpecBase(ABC, BaseComponent)[源代码]#
边规范的抽象基类,定义图中边的基本属性与接口。继承自
ABC与BaseComponent。- 参数:
source (str) -- 源节点名称。
group (str) -- 边所属分组,默认为
DEFAULT_EDGE_GROUP。edge_type (Literal['base']) -- 边的类型标识,默认为
"base"。
- class evofabric.core.graph.EdgeSpec(EdgeSpecBase)[源代码]#
普通边类型,仅连接一个目标节点。继承自
EdgeSpecBase。- 参数:
source (str) -- 源节点名称。
group (str) -- 边所属分组,默认为
DEFAULT_EDGE_GROUP。edge_type (Literal['edge']) -- 边的类型标识,固定为
"edge"。target (str) -- 目标节点名称。
state_filter (Optional[StateFilterLike]) -- 状态过滤函数,可选,用于控制边的触发条件。
- class evofabric.core.graph.ConditionEdgeSpec(EdgeSpecBase)[源代码]#
条件边类型,支持根据状态动态决定目标节点。继承自
EdgeSpecBase。- 参数:
source (str) -- 源节点名称。
group (str) -- 边所属分组,默认为
DEFAULT_EDGE_GROUP。edge_type (Literal['conditional']) -- 边的类型标识,固定为
"conditional"。router (Callable[[State], Union[str, List[str], Tuple[str, Callable], List[Tuple[str, Callable]]]]) -- 路由函数,接受
State作为输入,返回目标节点或节点列表,可附带状态过滤函数。possible_targets (List[str]) -- 所有允许的目标节点列表。
- evofabric.core.graph.cast_edge(v: dict) EdgeSpecBase#
根据输入字典中的
edge_type字段自动判别边的类型,并反序列化为对应的EdgeSpec或ConditionEdgeSpec实例。- 参数:
v (dict) -- 包含边定义的字典对象。
- 返回:
解析后的边对象。
- 返回类型:
State & Update#
- class evofabric.core.graph.StateUpdater[源代码]#
用于注册和管理状态更新策略的类。
通过该类,可以将自定义的状态合并函数注册为命名策略,并在需要时通过名称获取对应的策略函数。
使用示例:
@StateUpdater.register("overwrite") def overwrite(old: Any, new: Any) -> Any: return new strategy = StateUpdater.get("overwrite") merged = strategy(old_state, new_state)
- register(cls, name: str) Callable[[Callable], Callable][源代码]#
类方法,用于注册一个新的状态更新策略。
- 参数:
name (str) -- 策略名称,注册后可通过该名称获取对应的策略函数。
- 返回:
一个装饰器函数,用于装饰实际的状态更新函数。
- 返回类型:
Callable[[Callable], Callable]
- 抛出:
KeyError -- 如果策略名称已存在,则抛出异常。
- class evofabric.core.graph.StateCkpt(BaseComponent)[源代码]#
表示状态检查点,继承自
BaseComponent,用于管理状态及其变化。- 参数:
delta (Optional[SkipValidation[StateDelta]]) -- 状态的变化量(增量),可选。
parent (Optional[StateCkpt]) -- 父节点,用于构建状态链,可选。
state_schema (Optional[type[StateSchema]]) -- 状态结构的类型定义,可选。
materialized_state_cache (Optional[SkipValidation[State]]) -- 缓存的具体化状态,初始化时默认为 None,不参与初始化参数传递。
- materialize(self) State | StateSchema[源代码]#
递归追溯父节点,合并状态增量,获得完整的
State状态。- 返回:
具体化的状态对象,类型为
State或StateSchema。- 返回类型:
Union[State, StateSchema]
- merge(cls, checkpoints: List['StateCkpt'], strategy: Callable[[List[State]], State] = None)[源代码]#
合并多个状态为一个新的状态。
- evofabric.core.graph.generate_state_schema(variables: List[Tuple[str, Any, str]] | None = None)[源代码]#
声明图引擎中传递的状态信息的变量名称和类型。
- 注意事项:
变量名必须符合 Python 的变量命名规范。
变量类型必须是以下之一:str、int、float、list、tuple、dict。
存在一个名为
messages的常量变量用于记录代理上下文;请避免使用相同名称定义其他变量。
示例:
generate_state_schema([ ("msg_id", str, "overwrite"), ("user_id", bool, "overwrite") ])
- 参数:
variables (Optional[List[Tuple[str, Any, str]]]) -- 一个由元组组成的列表,每个元组包含变量名、变量类型和更新策略。
- 返回:
根据提供的变量动态生成的 Pydantic 模型类,表示状态结构。
- 返回类型:
pydantic.BaseModel
- 抛出:
ValueError -- 如果变量类型无效。
ValueError -- 如果变量名与保留字段冲突或重复定义。
ValueError -- 如果更新策略未在 StateUpdater 中注册。
- evofabric.core.graph._overwrite_state_update_strategy(old: Any = MISSING, new: Any = MISSING) Any#
更新策略
overwrite的具体实现。使用新值覆盖旧值的策略函数。
如果新值为MISSING,则返回旧值;
- 参数:
old (Any) -- 旧状态值,默认为MISSING。
new (Any) -- 新状态值,默认为MISSING。
- 返回:
更新后的状态值。
- 返回类型:
Any
- evofabric.core.graph._append_messages(old: List[StateMessage] = MISSING, new: List[StateMessage] = MISSING) List[StateMessage]#
更新策略
append_messages的具体实现。将新消息列表追加到旧消息列表末尾的策略函数,自动去重。
如果旧值或新值为MISSING,则视为空列表。
- 参数:
old (List[StateMessage]) -- 旧消息列表,默认为MISSING。
new (List[StateMessage]) -- 新消息列表,默认为MISSING。
- 返回:
合并后的消息列表,已去重。
- 返回类型:
List[StateMessage]
Streaming & Context#
- class evofabric.core.graph.StreamWriter(BaseComponent)[源代码]#
流式消息写入器,用于在异步流式处理过程中发送消息块。该类继承自
BaseComponent。
- class evofabric.core.graph.StreamCtx(BaseModel)[源代码]#
表示流式处理上下文信息的数据模型,继承自
BaseModel。- 参数:
node_name (Optional[str]) -- 当前节点名称(可选)。
call_id (Optional[str]) -- 当前调用 ID(可选)。
tool_name (Optional[str]) -- 当前工具名称(可选)。
tool_call_id (Optional[str]) -- 当前工具调用 ID(可选)。
- evofabric.core.graph.set_streaming_handler(callback: Callable[[dict], None | Awaitable[None]]) None[源代码]#
注册一个用于处理流式消息的回调函数。
使用示例:
def print_handler(payload): print(payload) set_streaming_handler(print_handler)
- 参数:
callback (Callable[[dict], Union[None, Awaitable[None]]]) -- 设置处理流式消息的回调函数,支持同步、异步类型。
- 返回:
无返回值
- 返回类型:
None
- evofabric.core.graph.stream_writer_env(ctx_updates: StreamCtx)[源代码]#
创建一个临时的流式处理上下文环境,在该上下文中执行代码块。
使用示例:
def node(stream_writer): stream_writer.put("streaming msg 1") stream_writer.put("streaming msg 2") ... with stream_writer_env(StreamCtx(call_id=str(uuid.uuid4()), node_name=self.node_name)): node(get_stream_writer())
- 参数:
ctx_updates (StreamCtx) -- 用于更新当前上下文的新字段值。
- 返回:
上下文管理器,可用于 with 语句中。
- 返回类型:
contextmanager