evofabric.core.graph

目录

evofabric.core.graph#

Graph Builder#

class evofabric.core.graph.GraphBuilder(BaseComponent)[源代码]#

基于状态驱动的图构建器,用于编排节点与边的执行逻辑,继承自 BaseComponent

参数:

state_schema (type[StateSchema]) -- 本图所使用的状态模式,可为 dictpydantic.BaseModel 子类。

add_node(self, name: str, node: Callable | NodeBase, action_mode: NodeActionMode | str = 'any', multi_input_merge_strategy: dict[str, Callable[[List[State]], State]] = None)[源代码]#

向图中添加一个节点。

参数:
  • name (str) -- 节点名称,需在图内唯一。不可以使用 start``和``end,这两个节点是图引擎预留的特殊节点。

  • node (Union[Callable, NodeBase]) -- 节点执行体,可以是普通 Python 函数或 NodeBase 子类实例。

  • action_mode (Union[NodeActionMode, str]) -- 节点触发策略。默认是 any "all" 表示所有前驱节点执行完毕后才触发; "any" 表示任意前驱节点执行完毕即触发。

  • multi_input_merge_strategy (dict[str, Callable[[List[State]], State]]) -- 按通道名指定多前驱状态合并策略, key为通道名,值为 Callable[[List[State] -> State]] 的可调用对象。

add_edge(self, source: str, target: str, group: str = 'all', state_filter: Callable[[State], State] | None = None)[源代码]#

在两个节点之间建立一条普通边。

参数:
  • source (str) -- 源节点名称。

  • target (str) -- 目标节点名称。

  • group (str) -- 边所属组名,用于批量控制或可视化分组。

  • state_filter (Optional[Callable[[State], State]]) -- 可选的状态过滤函数,对流经该边的状态进行拦截与改写。

add_condition_edge(self, source: str, router: Callable, possible_targets: List[str] | Set[str], group: str = 'all')[源代码]#

为指定源节点添加条件路由边。

router 支持解析四种类型的返回结果:

  • 单个 str:下一跳目标节点名;

  • List[str]:多个下一跳目标节点名;

  • Tuple[str, Callable]:目标节点名 + 该边专用状态过滤函数;

  • List[Tuple[str, Callable]]:多组(目标节点名, 状态过滤函数)。

参数:
  • source (str) -- 源节点名称。

  • router (Callable) --

    路由函数,按运行时的状态决定下一跳,输入是源节点输出的完整State状态。

    备注

    注意:每个节点的输出是state的增量信息。但router函数的输入是:该节点输出的增量信息和输入state合并后的完整state。

  • possible_targets (Union[List[str], Set[str]]) -- 所有可能被路由到的目标节点名,用于校验与优化。

  • group (str) -- 边所属组名。

set_entry_point(self, entry_name: str)[源代码]#

设置图的唯一入口节点。

参数:

entry_name (str) -- 入口节点名称,需已通过 add_node 添加。

build(self, auto_conn_end: bool = True, max_turn: int = None, timeout: int = None, graph_mode: GraphMode = GraphMode.RUN, db_file_path: str = './.storage.db', db_name: str = 'evofabric')[源代码]#

根据已添加的节点与边构建出可执行图实例。

参数:
  • auto_conn_end (bool) -- 若为 True,自动为所有无后继的节点连接至内置 END 节点。

  • max_turn (int) -- 图最大运行轮数,防止无限循环。

  • timeout (int) -- 图运行总超时时间(秒)。

  • graph_mode (GraphMode | str) -- 图运行模式,默认 run 表示执行模式, debug 表示调试模式。

  • db_file_path (str) -- 持久化数据库文件路径,用于在debug模式下存储图状态快照。

  • db_name (str) -- 数据库名称。

返回:

可执行的 GraphEngineGraphEngineDebugger 图引擎实例

dumps(self, graph_name: str = 'graph', version: str = '1.0') dict[源代码]#

将当前图构建器中的节点与边序列化为字典配置。

参数:
  • graph_name (str) -- 图名称,默认为 "graph"。

  • version (str) -- 配置版本号,默认为 "1.0"。

返回:

包含图名称、版本、状态模式、节点、边及是否设置入口点的字典。

返回类型:

dict

dump(self, save_path: str, graph_name: str = 'graph', version: str = '1.0')[源代码]#

将当前图构建器中的内容序列化并保存到指定文件。

参数:
  • save_path (str) -- 保存路径。

  • graph_name (str) -- 图名称,默认为 "graph"。

  • version (str) -- 配置版本号,默认为 "1.0"。

load(cls, file_path: str) 'GraphBuilder'[源代码]#

从文件中加载已构建的图构建器。

参数:

file_path (str) -- 文件路径。

返回:

加载后的 GraphBuilder 实例。

返回类型:

GraphBuilder

loads(cls, data: dict) 'GraphBuilder'[源代码]#

从配置字典中加载已构建的图构建器。

参数:

data (dict) -- 包含图配置的字典。

返回:

加载后的 GraphBuilder 实例。

返回类型:

GraphBuilder

Graph Engine#

class evofabric.core.graph.GraphEngine(BaseComponent)[源代码]#

图执行引擎,负责解析并驱动整个图(节点+边)的运行,支持状态检查点、最大轮次与超时控制。

参数:
  • nodes (Dict[str, GraphNodeSpec]) -- 图中所有节点的规格映射,键为节点名,值为 GraphNodeSpec 对象。

  • edges (Dict[str, List[EdgeSpecBase]]) -- 图中所有边的规格映射,键为源节点名,值为该节点出发的 EdgeSpecBase 列表。

  • state_schema (Optional[SkipValidation[type[StateSchema]]]) -- 状态模式类型,用于运行时状态校验;为 None 时不做校验。

  • max_turn (Optional[int]) -- 允许的最大节点调用次数;超过即强制终止已运行的图,但已到达 END 节点的输出仍保留;为 None 表示无限制。

  • timeout (Optional[int]) -- 单个节点执行的超时时间(秒);为 None 表示不限制。

async run(self, inputs: Dict)[源代码]#

异步启动图运行。

参数:

inputs (Dict) -- 初始输入数据,将注入 start 节点的状态,key和value需要严格对应 StateSchema

返回:

运行结束后的最终状态。

返回类型:

Dict

draw_graph(self, save_path: str = None, auto_open: bool = True)[源代码]#

生成图的可视化 HTML 文件。

参数:
  • save_path (str) -- 文件保存路径;为 None 时不落盘。

  • auto_open (bool) -- 生成后是否自动用浏览器打开。

class evofabric.core.graph.RunTimeTask(BaseModel)[源代码]#

用于描述在图执行过程中需要被调度的节点任务及其相关状态与过滤条件。

参数:
  • node_name (str) -- 目标节点名称。

  • state_ckpt (Union[StateCkpt, List[StateCkpt]]) -- 当前任务对应的 StateCkpt ;支持单个 StateCkpt 或列表。

  • edge_group (str) -- 当前任务所属边的分组名称,用于在并行或条件分支中区分不同边集合。

  • predecessor (Optional[Union[str, List[str]]]) -- 前驱节点名称;支持单个节点名或节点名列表,为 None 时表示无前驱节点。

  • state_filter (Optional[Callable]) -- 应用于边的 状态过滤函数,签名需满足 Callable[[Any], bool];为 None 时不进行额外过滤。

  • trace_route (List[str]) -- 已执行的节点轨迹,按顺序记录沿途经过的节点名;默认空列表。

Graph Engine Debugger#

class evofabric.core.graph.GraphEngineDebugger(GraphEngine)[源代码]#

可调试版本的 GraphEngine,支持断点、步进执行、状态恢复及节点输出修改。

参数:
  • db_file_path (str) -- 用于状态存储的数据库文件路径。默认值 ".state_storage.db"

  • db_name (str) -- 数据库名称。默认值 "graph state"

model_post_init(context: Any) None[源代码]#

在模型创建后初始化内部结构。

reset() None[源代码]#

重置所有内部状态,包括跟踪树和断点。

set_breakpoint(node_name_bp: str | None = None, condition_bp: Any = None, condition: Any = None) None[源代码]#

在指定节点上设置断点。

参数:
  • node_name_bp (Optional[str]) -- 要设置断点的节点名称。

  • condition_bp (Any) -- 断点条件(暂不支持)。

  • condition (Any) -- 断点条件(暂不支持)。

抛出:

RuntimeError -- 当 node_name_bpNone 时抛出。

clear_breakpoint(node_name_bp: str | None = None, condition_bp: Any = None, condition: Any = None) None[源代码]#

清除指定节点的断点。

参数:
  • node_name_bp (Optional[str]) -- 要清除断点的节点名称。

  • condition_bp (Any) -- 断点条件(暂不支持)。

  • condition (Any) -- 断点条件(暂不支持)。

抛出:

RuntimeError -- 当 node_name_bpNone 时抛出。

clear_all_breakpoint() None[源代码]#

清除所有断点。

async resume(running_queue: List[RunTimeTask] | None = None) Awaitable[Any][源代码]#

从当前断点或指定的 RuntimeTask 队列继续执行。

参数:

running_queue (Optional[List[RunTimeTask]]) -- 待恢复任务队列;若为 None,则从非断点的叶节点恢复。

返回:

一步执行结果。

返回类型:

Any

抛出:

RuntimeError -- 当图引擎已经启动时抛出。

async step_over(node_uuid: str | None = None) Awaitable[Any][源代码]#

跳过当前断点或指定节点。

参数:

node_uuid (Optional[str]) -- 要跳过的节点UUID;若为 None,则跳过所有当前节点。

返回:

一步执行结果。

返回类型:

Any

restore_step(node_uuid: str | None = None) None[源代码]#

恢复上一步执行操作。

参数:

node_uuid (Optional[str]) -- 要恢复的节点UUID;若为 None,则恢复上一步操作。

async run_one_step(running_queue: List[RunTimeTask]) Awaitable[Tuple[Any, List[RunTimeTask]]][源代码]#

执行图中的一个步骤。

参数:

running_queue (List[RunTimeTask]) -- 待执行任务队列。

返回:

输出结果与下一批候选任务节点。

返回类型:

Tuple[Any, List[RunTimeTask]]

async debug(inputs: Dict) Awaitable[Any][源代码]#

启动调试会话并注入初始输入。

参数:

inputs (Dict) -- 初始状态字典。

返回:

调试完成后的最终结果。

返回类型:

Any

change_output(from_node_uuid: str, to_node_uuid: str, change_key: str, change_value: Any) None[源代码]#

在节点间修改输出值。

参数:
  • from_node_uuid (str) -- 源节点UUID。

  • to_node_uuid (str) -- 目标节点UUID。

  • change_key (str) -- 要修改的键。

  • change_value (Any) -- 新的键值。

抛出:

RuntimeError -- 当目标节点不是当前叶节点时抛出。

示例

from typing import Annotated, List, TypedDict
from pydantic import BaseModel
from evofabric.core.graph import GraphBuilder
from evofabric.core.typing import *
from evofabric.logger import get_logger

class State(TypedDict):
    messages: Annotated[List, "append_messages"]
    node_output: Annotated[str, "overwrite"]

def node_a(state):
    return {"messages": [AssistantMessage(content="node a output")], 'node_output': 'a'}

def node_b(state):
    self.assertIsInstance(state, dict)
    self.assertIsInstance(state['messages'][0], StateMessage)
    return {"messages": [AssistantMessage(content="node b output")], 'node_output': 'b'}

def node_c(state):
    self.assertIsInstance(state, dict)
    self.assertIsInstance(state['messages'][0], StateMessage)
    return {"messages": [AssistantMessage(content="node c output")], 'node_output': 'c'}

def node_d(state):
    self.assertIsInstance(state, dict)
    self.assertIsInstance(state['messages'][0], StateMessage)
    return {"messages": [AssistantMessage(content="node d output")], 'node_output': 'd'}

graph_builder = GraphBuilder(state_schema=State)
graph_builder.add_node("a", node_a)
graph_builder.add_node("b", node_b)
graph_builder.add_node("c", node_c)
graph_builder.add_node("d", node_d)
graph_builder.add_edge("a", "b")
graph_builder.add_edge("b", "c")
graph_builder.add_edge("c", "d")
graph_builder.set_entry_point("a")
graph = graph_builder.build(graph_mode=GraphMode.DEBUG, db_file_path="D:/Download/.db_storage.db")

# debug mode example
graph.set_breakpoint(node_name_bp="b")
await graph.debug({"messages": [UserMessage(content='hello')]})
await graph.step_over(node_uuid=graph._trace_tree.get_leaf_node_uuid_by_node_name(node_name="b"))
graph.restore_step()
graph.clear_breakpoint(node_name_bp="b")
result = await graph.resume()

Node#

class evofabric.core.graph.NodeBase(BaseComponent)[源代码]#

所有节点的抽象基类,继承自 BaseComponent。 子类需实现 __call__ 协议,以定义节点在被调用时的具体行为。

class evofabric.core.graph.SyncNode(NodeBase)[源代码]#

同步执行节点,继承自 NodeBase。 在调用时以同步方式接收当前状态并返回状态增量。

__call__(self, state: State) StateDelta[源代码]#

同步执行节点逻辑。

参数:

state (State) -- 当前工作流状态。

返回:

相对于当前状态的变化量。

返回类型:

StateDelta

class evofabric.core.graph.AsyncNode(NodeBase)[源代码]#

异步执行节点,继承自 NodeBase。 在调用时以异步方式接收当前状态并返回状态增量。

async __call__(self, state: State) StateDelta[源代码]#

异步执行节点逻辑。

参数:

state (State) -- 当前工作流状态。

返回:

相对于当前状态的变化量。

返回类型:

StateDelta

class evofabric.core.graph.SyncStreamNode(NodeBase)[源代码]#

支持同步流式输出的节点,继承自 NodeBase。 在调用过程中可通过 stream_writer 实时向外发送流式数据块。

__call__(self, state: State, stream_writer: StreamWriter) StateDelta[源代码]#

同步流式执行节点逻辑。

参数:
  • state (State) -- 当前工作流状态。

  • stream_writer (StreamWriter) -- 用于实时输出流式数据的写入器。

返回:

相对于当前状态的变化量。

返回类型:

StateDelta

class evofabric.core.graph.AsyncStreamNode(NodeBase)[源代码]#

支持异步流式输出的节点,继承自 NodeBase。 在调用过程中可通过 stream_writer 实时向外发送流式数据块。

async __call__(self, state: State, stream_writer: StreamWriter) StateDelta[源代码]#

异步流式执行节点逻辑。

参数:
  • state (State) -- 当前工作流状态。

  • stream_writer (StreamWriter) -- 用于实时输出流式数据的写入器。

返回:

相对于当前状态的变化量。

返回类型:

StateDelta

class evofabric.core.graph.GraphNodeSpec(BaseComponent)[源代码]#

该节点在图引擎中使用,会首先将图运行信息注入到上下文中,随后根据不同节点类型决定调用方式并分发不同入参。继承自 BaseComponent

参数:
  • node (NodeBase) -- 节点实例,必须继承自 NodeBase

  • node_name (str) -- 节点在图中的可读名称。

  • action_mode (NodeActionMode) -- 节点执行模式,默认 NodeActionMode.ALL

  • stream_writer (Optional[StreamWriter]) -- 可选的流式写手指针,用于把节点中间结果实时推送出去。

  • multi_input_merge_strategy (Optional[Dict[str, Callable[[List[State]], State]]]) -- 当节点存在多路输入时,按 key 指定自定义合并函数 Callable[[List[State]], State];若未提供则使用默认策略。

  • node_id (str) -- 节点全局唯一标识,默认自动生成 UUID。

is_active(self) bool#

@property 类型。当前节点是否处于激活状态。

返回:

激活返回 True,否则 False

返回类型:

bool

async __call__(self, state: State, **kwargs) StateDelta[源代码]#

通过 evofabric.core.graph.stream_writer_env() 注入节点执行信息,并在异步锁保护下执行节点逻辑。

参数:

state (State) -- 输入状态对象。

返回:

节点执行后产生的状态增量。

返回类型:

StateDelta

evofabric.core.graph.callable_to_node(callable_obj: Callable[..., Any]) NodeBase[源代码]#

将任意可调用对象(函数或实现了 __call__ 方法的类实例)自动转换为对应的节点类型实例。

函数会根据目标对象的定义特征自动判断其类型:

  • 若为 异步函数 且带有 stream_writer 参数 → 转换为异步流式节点。

  • 若为 异步函数 且不带 stream_writer 参数 → 转换为异步普通节点。

  • 若为 同步函数 且带有 stream_writer 参数 → 转换为同步流式节点。

  • 若为 同步函数 且不带 stream_writer 参数 → 转换为同步普通节点。

参数:

callable_obj (Callable[..., Any]) -- 任意可调用对象,可以是函数、方法或实现了 __call__ 的类实例。

返回:

对应的节点类型实例,类型为 NodeBase 的子类。

返回类型:

NodeBase

Edge#

class evofabric.core.graph.EdgeSpecBase(ABC, BaseComponent)[源代码]#

边规范的抽象基类,定义图中边的基本属性与接口。继承自 ABCBaseComponent

参数:
  • source (str) -- 源节点名称。

  • group (str) -- 边所属分组,默认为 DEFAULT_EDGE_GROUP

  • edge_type (Literal['base']) -- 边的类型标识,默认为 "base"

get_targets(self, state: State) List[Tuple[str, StateFilterLike | None]][源代码]#

抽象方法。根据当前状态返回下一步的目标节点及其对应的状态过滤器。

参数:

state (State) -- 当前图状态。

返回:

目标节点与状态过滤器的列表。

返回类型:

List[Tuple[str, Optional[StateFilterLike]]]

get_possible_targets(self) List[str][源代码]#

抽象方法。返回该边在静态分析中可能到达的所有目标节点。

返回:

所有可能的目标节点名称。

返回类型:

List[str]

class evofabric.core.graph.EdgeSpec(EdgeSpecBase)[源代码]#

普通边类型,仅连接一个目标节点。继承自 EdgeSpecBase

参数:
  • source (str) -- 源节点名称。

  • group (str) -- 边所属分组,默认为 DEFAULT_EDGE_GROUP

  • edge_type (Literal['edge']) -- 边的类型标识,固定为 "edge"

  • target (str) -- 目标节点名称。

  • state_filter (Optional[StateFilterLike]) -- 状态过滤函数,可选,用于控制边的触发条件。

get_targets(self, state: State) List[Tuple[str, StateFilterLike | None]][源代码]#

返回目标节点及其状态过滤器,适配统一接口。

参数:

state (State) -- 当前状态。

返回:

(目标节点, 状态过滤器) 的列表。

返回类型:

List[Tuple[str, Optional[StateFilterLike]]]

get_possible_targets(self) List[str][源代码]#

返回当前边的所有可能目标节点(对普通边为单一节点)。

返回:

目标节点名称列表。

返回类型:

List[str]

class evofabric.core.graph.ConditionEdgeSpec(EdgeSpecBase)[源代码]#

条件边类型,支持根据状态动态决定目标节点。继承自 EdgeSpecBase

参数:
  • source (str) -- 源节点名称。

  • group (str) -- 边所属分组,默认为 DEFAULT_EDGE_GROUP

  • edge_type (Literal['conditional']) -- 边的类型标识,固定为 "conditional"

  • router (Callable[[State], Union[str, List[str], Tuple[str, Callable], List[Tuple[str, Callable]]]]) -- 路由函数,接受 State 作为输入,返回目标节点或节点列表,可附带状态过滤函数。

  • possible_targets (List[str]) -- 所有允许的目标节点列表。

get_targets(self, state: State) List[Tuple[str, StateFilterLike | None]][源代码]#

调用 router 函数,根据当前状态获取下一步目标节点,并统一格式化为 [(节点名, 状态过滤函数)]

参数:

state (State) -- 当前状态。

返回:

目标节点与状态过滤函数列表。

返回类型:

List[Tuple[str, Optional[StateFilterLike]]]

get_possible_targets(self) List[str][源代码]#

返回条件边在静态分析下的所有可能目标节点。

返回:

所有可能的目标节点名称。

返回类型:

List[str]

evofabric.core.graph.cast_edge(v: dict) EdgeSpecBase#

根据输入字典中的 edge_type 字段自动判别边的类型,并反序列化为对应的 EdgeSpecConditionEdgeSpec 实例。

参数:

v (dict) -- 包含边定义的字典对象。

返回:

解析后的边对象。

返回类型:

EdgeSpecBase

State & Update#

class evofabric.core.graph.StateUpdater[源代码]#

用于注册和管理状态更新策略的类。

通过该类,可以将自定义的状态合并函数注册为命名策略,并在需要时通过名称获取对应的策略函数。

使用示例:

@StateUpdater.register("overwrite")
def overwrite(old: Any, new: Any) -> Any:
    return new

strategy = StateUpdater.get("overwrite")
merged = strategy(old_state, new_state)
register(cls, name: str) Callable[[Callable], Callable][源代码]#

类方法,用于注册一个新的状态更新策略。

参数:

name (str) -- 策略名称,注册后可通过该名称获取对应的策略函数。

返回:

一个装饰器函数,用于装饰实际的状态更新函数。

返回类型:

Callable[[Callable], Callable]

抛出:

KeyError -- 如果策略名称已存在,则抛出异常。

get(cls, name: str) Callable[[Any, Any], Any][源代码]#

类方法,根据名称获取已注册的状态更新策略函数。

参数:

name (str) -- 策略名称。

返回:

对应的状态更新函数,接受旧状态和新状态作为参数,返回合并后的状态。

返回类型:

Callable[[Any, Any], Any]

抛出:

KeyError -- 如果策略名称未注册,则抛出异常。

list_strategies(cls) List[str][源代码]#

类方法,返回所有已注册策略的名称列表。

返回:

包含所有策略名称的列表。

返回类型:

List[str]

registered(cls, name: str) bool[源代码]#

类方法,判断指定名称的策略是否已注册。

参数:

name (str) -- 策略名称。

返回:

如果策略已注册返回 True,否则返回 False

返回类型:

bool

class evofabric.core.graph.StateCkpt(BaseComponent)[源代码]#

表示状态检查点,继承自 BaseComponent,用于管理状态及其变化。

参数:
  • delta (Optional[SkipValidation[StateDelta]]) -- 状态的变化量(增量),可选。

  • parent (Optional[StateCkpt]) -- 父节点,用于构建状态链,可选。

  • state_schema (Optional[type[StateSchema]]) -- 状态结构的类型定义,可选。

  • materialized_state_cache (Optional[SkipValidation[State]]) -- 缓存的具体化状态,初始化时默认为 None,不参与初始化参数传递。

materialize(self) State | StateSchema[源代码]#

递归追溯父节点,合并状态增量,获得完整的 State 状态。

返回:

具体化的状态对象,类型为 StateStateSchema

返回类型:

Union[State, StateSchema]

merge(cls, checkpoints: List['StateCkpt'], strategy: Callable[[List[State]], State] = None)[源代码]#

合并多个状态为一个新的状态。

参数:
  • checkpoints (List[StateCkpt]) -- 需要合并的状态列表。

  • strategy (Callable[[List[State]], State]) -- 合并策略函数,接收多个状态并返回合并后的状态。若未提供,则使用:py:class:`StateSchema`中声明的默认逻辑。

返回:

合并后的新状态。

返回类型:

StateCkpt

filter(cls, checkpoint: 'StateCkpt', strategy: Callable[[State], State])[源代码]#

对给定的状态应用过滤策略,生成新的状态。

参数:
  • checkpoint (StateCkpt) -- 需要过滤的状态。

  • strategy (Callable[[State], State]) -- 过滤策略函数,接收一个状态并返回处理后的状态。

返回:

应用过滤策略后的新状态检查点。

返回类型:

StateCkpt

merge_state(state, delta, state_schema) Dict | StateSchema[源代码]#

静态方法,用于将状态和增量合并为新的状态。

参数:
  • state (State) -- 原始状态。

  • delta (StateDelta) -- 状态增量。

  • state_schema (type[StateSchema]) -- 状态结构定义,用于类型转换。

返回:

合并后的新状态,类型为字典或 StateSchema

返回类型:

Union[Dict, StateSchema]

evofabric.core.graph.generate_state_schema(variables: List[Tuple[str, Any, str]] | None = None)[源代码]#

声明图引擎中传递的状态信息的变量名称和类型。

注意事项:
  • 变量名必须符合 Python 的变量命名规范。

  • 变量类型必须是以下之一:str、int、float、list、tuple、dict。

  • 存在一个名为 messages 的常量变量用于记录代理上下文;请避免使用相同名称定义其他变量。

示例:

generate_state_schema([
    ("msg_id", str, "overwrite"),
    ("user_id", bool, "overwrite")
])
参数:

variables (Optional[List[Tuple[str, Any, str]]]) -- 一个由元组组成的列表,每个元组包含变量名、变量类型和更新策略。

返回:

根据提供的变量动态生成的 Pydantic 模型类,表示状态结构。

返回类型:

pydantic.BaseModel

抛出:
  • ValueError -- 如果变量类型无效。

  • ValueError -- 如果变量名与保留字段冲突或重复定义。

  • ValueError -- 如果更新策略未在 StateUpdater 中注册。

evofabric.core.graph._overwrite_state_update_strategy(old: Any = MISSING, new: Any = MISSING) Any#

更新策略 overwrite 的具体实现。

使用新值覆盖旧值的策略函数。

如果新值为MISSING,则返回旧值;

参数:
  • old (Any) -- 旧状态值,默认为MISSING。

  • new (Any) -- 新状态值,默认为MISSING。

返回:

更新后的状态值。

返回类型:

Any

evofabric.core.graph._append_messages(old: List[StateMessage] = MISSING, new: List[StateMessage] = MISSING) List[StateMessage]#

更新策略 append_messages 的具体实现。

将新消息列表追加到旧消息列表末尾的策略函数,自动去重。

如果旧值或新值为MISSING,则视为空列表。

参数:
  • old (List[StateMessage]) -- 旧消息列表,默认为MISSING。

  • new (List[StateMessage]) -- 新消息列表,默认为MISSING。

返回:

合并后的消息列表,已去重。

返回类型:

List[StateMessage]

Streaming & Context#

class evofabric.core.graph.StreamWriter(BaseComponent)[源代码]#

流式消息写入器,用于在异步流式处理过程中发送消息块。该类继承自 BaseComponent

put(payload: Any) None[源代码]#

将流式消息内容放入写入器,并触发已注册的消息回调函数。

参数:

payload (Any) -- 要发送的流式数据内容,可以是任意类型。

返回:

无返回值

返回类型:

None

class evofabric.core.graph.StreamCtx(BaseModel)[源代码]#

表示流式处理上下文信息的数据模型,继承自 BaseModel

参数:
  • node_name (Optional[str]) -- 当前节点名称(可选)。

  • call_id (Optional[str]) -- 当前调用 ID(可选)。

  • tool_name (Optional[str]) -- 当前工具名称(可选)。

  • tool_call_id (Optional[str]) -- 当前工具调用 ID(可选)。

__bool__(self) bool[源代码]#

判断当前上下文是否为空。若所有属性都为空,则返回 False,否则返回 True

返回:

上下文是否存在有效信息

返回类型:

bool

__repr__(self) str[源代码]#

返回当前上下文的字符串表示形式。

返回:

描述当前上下文的字符串

返回类型:

str

evofabric.core.graph.set_streaming_handler(callback: Callable[[dict], None | Awaitable[None]]) None[源代码]#

注册一个用于处理流式消息的回调函数。

使用示例:

def print_handler(payload):
    print(payload)

set_streaming_handler(print_handler)
参数:

callback (Callable[[dict], Union[None, Awaitable[None]]]) -- 设置处理流式消息的回调函数,支持同步、异步类型。

返回:

无返回值

返回类型:

None

evofabric.core.graph.current_ctx() StreamCtx[源代码]#

获取当前线程/协程中的流式处理上下文。

返回:

当前的流式处理上下文对象

返回类型:

StreamCtx

evofabric.core.graph.stream_writer_env(ctx_updates: StreamCtx)[源代码]#

创建一个临时的流式处理上下文环境,在该上下文中执行代码块。

使用示例:

def node(stream_writer):
    stream_writer.put("streaming msg 1")
    stream_writer.put("streaming msg 2")
    ...

with stream_writer_env(StreamCtx(call_id=str(uuid.uuid4()), node_name=self.node_name)):
    node(get_stream_writer())
参数:

ctx_updates (StreamCtx) -- 用于更新当前上下文的新字段值。

返回:

上下文管理器,可用于 with 语句中。

返回类型:

contextmanager

evofabric.core.graph.get_stream_writer()[源代码]#

获取全局共享的 StreamWriter 实例。

返回:

全局 StreamWriter 实例

返回类型:

StreamWriter