Workflow API

The workflow engine lives in kavalai.workflow. It turns a YAML graph (or a WorkflowBuilder chain) into an executable state machine, and checkpoints a serialisable state through pluggable storage and task-logger backends.

Engine and builder

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

kavalai.workflow.engine.make_prompt(prompt: str, input_data: dict) str[source]

Combine a rendered prompt with resolved input data into a system message.

class kavalai.workflow.engine.WorkflowEngine(graph: WorkflowGraph, *, storage: DataStorage | None = None, task_logger: TaskLogger | None = None, client_factory: Callable[[...], BaseLlmClient] | None = None, data_models: dict[str, type[BaseModel]] | None = None, max_node_visits: int = 1000)[source]

Bases: object

Executes a v2 WorkflowGraph as a DAG / state machine.

The engine walks the graph from the start node, following transitions and evaluating branch nodes, until it reaches an end node. Each node’s result is stored in the run context; the serialized WorkflowState is checkpointed to storage after every node and per-node debug data flows to task_logger.

Parameters:
graph : WorkflowGraph

The parsed workflow definition.

storage : Optional[DataStorage]

Persistence backend for agents/sessions/runs/chat/state.

task_logger : Optional[TaskLogger]

Backend for per-node debug data and model statistics.

client_factory : Optional[ClientFactory]

Factory (model, parameters, stats_receiver) -> BaseLlmClient used to build LLM clients. Defaults to the provider factory; inject a fake for offline testing.

max_node_visits : int

Safety cap on total node executions to guard against infinite loops.

classmethod from_yaml(yaml_string: str, **kwargs) WorkflowEngine[source]

Build an engine from a YAML workflow definition string.

classmethod from_yaml_path(yaml_path: str, **kwargs) WorkflowEngine[source]

Build an engine from a YAML workflow definition file.

classmethod from_dict(data: dict, **kwargs) WorkflowEngine[source]

Build an engine from a parsed workflow definition dict.

get_data_type(name: str | None)[source]
async run(input_data: dict, *, session_id: str | None = None, external_id: str | None = None) WorkflowState[source]

Execute the workflow for input_data and return the final state.

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.workflow.builder.WorkflowBuilder(name: str, *, description: str = '', version: str = '2.0', llm_model: str | None = None, llm_kwargs: dict[str, Any] | None = None)[source]

Bases: object

A small fluent builder for constructing a WorkflowGraph in code.

Every method returns self so calls can be chained, and build() validates and returns the graph (build_engine() returns a ready WorkflowEngine). For example:

graph = (
    WorkflowBuilder("Greeter", llm_model="openai/gpt-4o-mini")
    .data_type("input", {"user_message": str})
    .data_type("output", {"agent_response": str})
    .start("reply")
    .llm("reply", prompt="Greet the user.",
         inputs={"input": "input"}, output="output", next="end")
    .end()
    .build()
)
data_type(name: str, fields: dict[str, str | type | dict] | None = None, *, schema: dict | None = None, ref: str | None = None) WorkflowBuilder[source]

Declare a data type.

Pass fields for an object of named scalars ({"intent": str}), schema for a full JSON-schema fragment, or ref to alias another type.

data_model(name: str, model: type[BaseModel]) WorkflowBuilder[source]

Declare a data type from a Pydantic model class.

The model is used directly at run time (its fields validate the matching node input/output and drive structured LLM output), so you get full Pydantic expressiveness — Literal enums, defaults, validators — without writing a JSON-schema fragment. Its JSON schema is also recorded on the graph for storage and inspection.

Build the engine with build_engine() (which forwards the models) or pass data_models= to WorkflowEngine yourself.

start(next: str, *, name: str = 'start') WorkflowBuilder[source]
end(*, name: str = 'end', output: str = 'output') WorkflowBuilder[source]
llm(name: str, *, prompt: str, output: str, next: str, inputs: dict[str, ArgumentInfo | str | dict] | None = None, use_history: bool = True, llm_model: str | None = None, llm_kwargs: dict[str, Any] | None = None, stream_output: bool = False) WorkflowBuilder[source]
agent(name: str, *, prompt: str, output: str, next: str, inputs: dict[str, ArgumentInfo | str | dict] | None = None, allowed_tools: list[str] | None = None, max_steps: int = 10, llm_model: str | None = None, llm_kwargs: dict[str, Any] | None = None) WorkflowBuilder[source]
function(name: str, *, tool: str, output: str, next: str, inputs: dict[str, ArgumentInfo | str | dict] | None = None, method: str = 'get') WorkflowBuilder[source]
if_(name: str, *, condition: str, then: str, else_: str) WorkflowBuilder[source]
switch(name: str, *, expr: str, cases: dict[str, str] | None = None, default: str | None = None) WorkflowBuilder[source]
python_function(name: str, path: str) WorkflowBuilder[source]

Register a Python tool by import path (e.g. pkg.mod.func).

rest_server(server: RestServer | dict) WorkflowBuilder[source]
mcp_server(server: McpServer | dict) WorkflowBuilder[source]
template(name: str, value: str) WorkflowBuilder[source]
build() WorkflowGraph[source]

Validate and return the WorkflowGraph.

build_engine(**kwargs)[source]

Build the graph and wrap it in a ready-to-run WorkflowEngine.

Any Pydantic models registered via data_model() are forwarded to the engine. Keyword arguments are passed through (storage, task_logger, client_factory, data_models, max_node_visits).

Graph models

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.workflow.models.BaseNode(*, name: str)[source]

Bases: BaseModel

Common fields shared by every node in a workflow graph.

A node is one vertex in the DAG/state-machine. name uniquely identifies the node and is the target referenced by transitions (next/then/ else/cases/default).

name : str
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.StartNode(*, name: str, type: 'start' = 'start', next: str)[source]

Bases: BaseNode

Interaction start node.

The caller hands an input to this node; execution begins here and proceeds to next.

type : Literal['start']
next : str
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.EndNode(*, name: str, type: 'end' = 'end', output: str = 'output')[source]

Bases: BaseNode

Interaction end node.

Reaching an end node terminates the interaction. output names the context variable whose value is returned to the caller.

type : Literal['end']
output : str
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.LLMNode(*, name: str, type: ~typing.Literal['llm'] = 'llm', prompt: str, inputs: dict[str, ~kavalai.agents.workflow_model.ArgumentInfo] = {}, output: str, next: str, use_history: bool = True, llm_model: str | None = None, llm_kwargs: dict[str, ~typing.Any] = <factory>, stream_output: bool = False)[source]

Bases: BaseNode

Single LLM completion node.

Resolves inputs from context, renders prompt and calls the LLM, storing the structured result in the output context variable, then transitions to next.

type : Literal['llm']
prompt : str
inputs : dict[str, ArgumentInfo]
output : str
next : str
use_history : bool
llm_model : str | None
llm_kwargs : dict[str, Any]
stream_output : bool
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.AgentNode(*, name: str, type: ~typing.Literal['agent'] = 'agent', prompt: str, inputs: dict[str, ~kavalai.agents.workflow_model.ArgumentInfo] = {}, output: str, next: str, allowed_tools: list[str] = <factory>, max_steps: int = 10, llm_model: str | None = None, llm_kwargs: dict[str, ~typing.Any] = <factory>)[source]

Bases: BaseNode

Multi-step agent node.

Runs the v2 Agent loop (tool calling) up to max_steps and stores the final result in output.

type : Literal['agent']
prompt : str
inputs : dict[str, ArgumentInfo]
output : str
next : str
allowed_tools : list[str]
max_steps : int
llm_model : str | None
llm_kwargs : dict[str, Any]
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.FunctionNode(*, name: str, type: 'function' = 'function', tool: str, inputs: dict[str, ArgumentInfo] = {}, output: str, next: str, method: str = 'get')[source]

Bases: BaseNode

Function-call node.

Invokes a single tool via the FunctionKernel (python:// / rest:// / mcp:// URIs) and stores the result in output.

type : Literal['function']
tool : str
inputs : dict[str, ArgumentInfo]
output : str
next : str
method : str
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.IfNode(*, name: str, type: 'if' = 'if', condition: str, then: str, else_: str)[source]

Bases: BaseNode

Boolean branch node.

Evaluates the condition string expression (e.g. state.count > 3) against the run context and transitions to then when truthy, otherwise to else_ (authored as else in YAML).

type : Literal['if']
condition : str
then : str
else_ : str
model_config : ClassVar[ConfigDict] = {'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.SwitchNode(*, name: str, type: 'switch' = 'switch', expr: str, cases: dict[str, str] = {}, default: str | None = None)[source]

Bases: BaseNode

Multi-way branch node.

Evaluates the expr string expression, stringifies the result and looks it up in cases; falls back to default when no case matches.

type : Literal['switch']
expr : str
cases : dict[str, str]
default : str | None
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.models.WorkflowGraph(*, name: str, description: str = '', version: str = '2.0', llm_model: str | None = None, llm_kwargs: dict[str, ~typing.Any] = <factory>, data_types: dict[str, dict], rest_servers: list[~kavalai.agents.workflow_model.RestServer] = [], mcp_servers: list[~kavalai.agents.workflow_model.McpServer] = [], templates: list[~kavalai.agents.workflow_model.TemplateModel] = [], python_functions: list[~kavalai.agents.workflow_model.PythonFunction] = [], nodes: list[~typing.Annotated[~kavalai.workflow.models.StartNode | ~kavalai.workflow.models.EndNode | ~kavalai.workflow.models.LLMNode | ~kavalai.workflow.models.AgentNode | ~kavalai.workflow.models.FunctionNode | ~kavalai.workflow.models.IfNode | ~kavalai.workflow.models.SwitchNode, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], start: str | None = None)[source]

Bases: BaseModel

A workflow: a directed graph of nodes forming a state machine.

Variables:
name : str

Workflow / agent name.

description : str

Human-readable description.

version : str

Schema version.

llm_model : str | None

Default LLM model (provider/model); nodes may override.

llm_kwargs : dict[str, Any]

Default LLM kwargs; nodes may override.

data_types : dict[str, dict]

JSON-schema data type definitions (parsed by SchemaParser).

nodes : list[kavalai.workflow.models.StartNode | kavalai.workflow.models.EndNode | kavalai.workflow.models.LLMNode | kavalai.workflow.models.AgentNode | kavalai.workflow.models.FunctionNode | kavalai.workflow.models.IfNode | kavalai.workflow.models.SwitchNode]

The graph vertices.

start : str | None

Optional explicit start node name (otherwise the single start node is used).

name : str
description : str
version : str
llm_model : str | None
llm_kwargs : dict[str, Any]
data_types : dict[str, dict]
rest_servers : list[RestServer]
mcp_servers : list[McpServer]
templates : list[TemplateModel]
python_functions : list[PythonFunction]
nodes : list[Annotated[StartNode | EndNode | LLMNode | AgentNode | FunctionNode | IfNode | SwitchNode, FieldInfo(annotation=NoneType, required=True, discriminator='type')]]
start : str | None
validate_graph() WorkflowGraph[source]
property node_map : dict[str, Annotated[StartNode | EndNode | LLMNode | AgentNode | FunctionNode | IfNode | SwitchNode, FieldInfo(annotation=NoneType, required=True, discriminator='type')]]
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Run state

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.workflow.state.WorkflowState(*, workflow_name: str, status: ~typing.Literal['pending', 'running', 'completed', 'failed'] = 'pending', current_node: str | None = None, trace: list[str] = <factory>, data: dict = <factory>, input_data: dict = <factory>, output_data: dict | None = None, error: str | None = None, invocation_id: str | None = None, token_usage: dict | None = None, run_id: str | None = None, session_id: str | None = None, agent_id: str | None = None)[source]

Bases: BaseModel

Serializable runtime state of a single workflow interaction.

The state is JSON round-trippable (to_json / from_json) and is checkpointed to the configured DataStorage after every node so a run can be inspected or resumed.

workflow_name: name of the workflow being executed. status: lifecycle status of the run. current_node: name of the node about to run / last run. trace: ordered list of executed node names. data: the run context data (RunContext.data) passed through to_plain. input_data: the original interaction input. output_data: the value of the end node’s output variable (when finished). error: error message when status == 'failed'. invocation_id: short id shared by every log line of this run (for scanning). token_usage: aggregate model token counts for the run. run_id / session_id / agent_id: persistence identifiers (string UUIDs).

workflow_name : str
status : Literal['pending', 'running', 'completed', 'failed']
current_node : str | None
trace : list[str]
data : dict
input_data : dict
output_data : dict | None
error : str | None
invocation_id : str | None
token_usage : dict | None
run_id : str | None
session_id : str | None
agent_id : str | None
to_json() str[source]

Serialize the state to a JSON string.

classmethod from_json(data: str) WorkflowState[source]

Deserialize a WorkflowState from a JSON string.

model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Expressions

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

exception kavalai.workflow.expressions.ExpressionError[source]

Bases: ValueError

Raised when an expression cannot be parsed or safely evaluated.

Signals an invalid or empty expression, a syntax error, use of an unsupported/disallowed construct (function calls, comprehensions, imports, etc.) or an error encountered while evaluating the expression.

kavalai.workflow.expressions.evaluate_expression(expr: str, context: dict) Any[source]

Safely evaluate a simple string expression against context.

Supports comparisons (==, !=, <, <=, >, >=, in, not in, is, is not), boolean logic (and, or, not), arithmetic (+, -, *, /, //, %), literals, and list/tuple/dict displays. Names and attribute/subscript chains (state.count, input.user_message, items[0].title) are resolved from context via resolve_path(); unknown names resolve to None.

Arbitrary code is rejected: function calls, lambdas, comprehensions, imports, attribute writes, etc. all raise ExpressionError.

kavalai.workflow.expressions.evaluate_bool(expr: str, context: dict) bool[source]

Evaluate expr and coerce the result to a bool (for if nodes).

kavalai.workflow.expressions.evaluate_value(expr: str, context: dict) str[source]

Evaluate expr and stringify the result (for switch case lookup).

Client factory

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

kavalai.workflow.clients.build_parameters(llm_kwargs: dict[str, Any] | None) LlmClientParameters[source]

Build LlmClientParameters from a node’s llm_kwargs.

Recognised keys (temperature, top_p, reasoning_effort, service_tier, timeout_seconds) are mapped onto the parameters model; unknown keys are ignored so authors can keep provider-specific extras without breaking.

kavalai.workflow.clients.make_client(model: str, parameters: LlmClientParameters | None = None, stats_receiver: ModelStatsReceiver | None = None) BaseLlmClient[source]

Construct a v2 LLM client from a provider/model string.

Supported providers: openai, gemini, ollama, browser. The browser provider runs inference client-side via a WebLLM bridge (Pyodide only) and needs no API key — see BrowserLLMClient.

Storage backends

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.workflow.storage.base.RunHandle(*, agent_id: str, session_id: str, run_id: str)[source]

Bases: BaseModel

Identifiers created when a workflow run is initialized.

Mirrors the agent/session/run triple of the existing persistence layer so a Postgres backend can map straight onto AgentService.

agent_id : str
session_id : str
run_id : str
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.storage.base.ChatMsg(*, role: str, content: str | None = None)[source]

Bases: BaseModel

A single chat message row returned from history queries.

role : str
content : str | None
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.workflow.storage.base.DataStorage[source]

Bases: ABC

Common data-storage interface for workflow persistence.

Backends implement this to persist agents, sessions, runs, chat messages and the serialized WorkflowState. The method shapes intentionally mirror AgentService so a Postgres backend is a thin delegation layer over the existing tables.

abstractmethod async initialize_run(*, workflow_name: str, description: str | None = None, input_schema: dict | None = None, output_schema: dict | None = None, workflow: dict | None = None, session_id: str | None = None, external_id: str | None = None, input_data: dict | None = None) RunHandle[source]

Create (or reuse) the agent + session and start a new run.

abstractmethod async update_run(run_id: str, *, output_data: dict | None = None, context: dict | None = None) None[source]

Persist the final output and/or context of a run.

abstractmethod async save_state(run_id: str, state: WorkflowState) None[source]

Checkpoint the serialized workflow state for a run.

abstractmethod async load_state(run_id: str) WorkflowState | None[source]

Load the last checkpointed workflow state for a run, if any.

abstractmethod async add_chat_message(*, agent_id: str, session_id: str, run_id: str | None, role: str, content: str | None) None[source]

Append a chat message to a session.

abstractmethod async get_chat_history(session_id: str, limit: int = 50) list[ChatMsg][source]

Return the chat history for a session, oldest first.

async close() None[source]

Release any resources held by the backend. Override if needed.

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.workflow.storage.memory.SqliteDataStorage(path: str = ':memory:')[source]

Bases: DataStorage

In-memory (or file-backed) data storage using SQLite via aiosqlite.

Defaults to a private :memory: database, which makes it ideal for local development and tests. Pass a file path to persist across process runs.

async initialize_run(*, workflow_name: str, description: str | None = None, input_schema: dict | None = None, output_schema: dict | None = None, workflow: dict | None = None, session_id: str | None = None, external_id: str | None = None, input_data: dict | None = None) RunHandle[source]

Create (or reuse) the agent + session and start a new run.

async update_run(run_id: str, *, output_data: dict | None = None, context: dict | None = None) None[source]

Persist the final output and/or context of a run.

async save_state(run_id: str, state: WorkflowState) None[source]

Checkpoint the serialized workflow state for a run.

async load_state(run_id: str) WorkflowState | None[source]

Load the last checkpointed workflow state for a run, if any.

async add_chat_message(*, agent_id: str, session_id: str, run_id: str | None, role: str, content: str | None) None[source]

Append a chat message to a session.

async get_chat_history(session_id: str, limit: int = 50) list[ChatMsg][source]

Return the chat history for a session, oldest first.

async close() None[source]

Release any resources held by the backend. Override if needed.

class kavalai.workflow.storage.memory.InMemoryDataStorage[source]

Bases: DataStorage

Pure-Python, thread-free in-memory data storage.

A dependency-free alternative to SqliteDataStorage that keeps everything in plain Python structures. Crucially it spawns no background thread, so it works under Pyodide / WebAssembly — where aiosqlite cannot start its worker thread (RuntimeError: can't start new thread). It is the natural store for the in-browser playground and for fast unit tests.

Semantics mirror SqliteDataStorage: agents are reused by name, sessions are reused when a known id is supplied, and chat history is returned oldest-first. State lives only for the lifetime of the instance.

async initialize_run(*, workflow_name: str, description: str | None = None, input_schema: dict | None = None, output_schema: dict | None = None, workflow: dict | None = None, session_id: str | None = None, external_id: str | None = None, input_data: dict | None = None) RunHandle[source]

Create (or reuse) the agent + session and start a new run.

async update_run(run_id: str, *, output_data: dict | None = None, context: dict | None = None) None[source]

Persist the final output and/or context of a run.

async save_state(run_id: str, state: WorkflowState) None[source]

Checkpoint the serialized workflow state for a run.

async load_state(run_id: str) WorkflowState | None[source]

Load the last checkpointed workflow state for a run, if any.

async add_chat_message(*, agent_id: str, session_id: str, run_id: str | None, role: str, content: str | None) None[source]

Append a chat message to a session.

async get_chat_history(session_id: str, limit: int = 50) list[ChatMsg][source]

Return the chat history for a session, oldest first.

Task logging backends

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.workflow.tasklog.base.TaskLogger[source]

Bases: ABC

Common interface for storing per-node debugging data and model stats.

Logging is fire-and-forget: the public log_* methods schedule a background task and return immediately so they never block workflow execution. Call flush() (e.g. at the end of a run or in tests) to await all pending writes.

log_node(*, run_id: str | None, session_id: str | None, agent_id: str | None, node_name: str, node_type: str, inputs: dict | None, output: Any, prompt: str | None = None, duration: float = 0.0, errors: list[str] | None = None) None[source]

Record the execution of a single node (fire-and-forget).

log_model_call(stats: ModelCallStat, agent_id: str | None = None) None[source]

Record an LLM / embedding model call (fire-and-forget).

async flush() None[source]

Await all pending background writes.

async close() None[source]

Flush and release backend resources. Override to add cleanup.

class kavalai.workflow.tasklog.base.StatsBridge(task_logger: TaskLogger, agent_id: str | None = None)[source]

Bases: ModelStatsReceiver

Adapter forwarding LLM ModelCallStat events to a TaskLogger.

Wired into v2 LLM clients via their model_stats_receiver so every model call made during a workflow is logged against the run’s agent.

receive_model_stats(stats: ModelCallStat) None[source]
class kavalai.workflow.tasklog.base.TokenAccumulator(task_logger: TaskLogger | None = None, agent_id: str | None = None)[source]

Bases: ModelStatsReceiver

Aggregates token usage across a workflow run and optionally forwards each ModelCallStat to a TaskLogger.

The engine wires one accumulator into every LLM client built during a run so that, when the run ends, it can report the total token spend. When a task_logger is supplied each individual call is still logged through it, so this fully subsumes StatsBridge.

receive_model_stats(stats: ModelCallStat) None[source]
summary() dict[source]

Return the aggregated token counts as a plain dict.

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.workflow.tasklog.sqlite.SqliteTaskLogger(path: str = ':memory:')[source]

Bases: TaskLogger

Task logger storing node executions and model stats in SQLite.

Defaults to a private :memory: database. Pass a file path to keep the debugging data across runs.

async close() None[source]

Flush and release backend resources. Override to add cleanup.