Workflow API¶
The workflow engine lives in kavalai.workflow. It turns a YAML graph (or a
WorkflowBuilder chain) into an executable state machine, and
checkpoints a serialisable state through pluggable storage and task-logger
backends.
Engine and builder¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
- kavalai.workflow.engine.make_prompt(prompt: str, input_data: dict) str[source]¶
Combine a rendered prompt with resolved input data into a system message.
-
class kavalai.workflow.engine.WorkflowEngine(graph: WorkflowGraph, *, storage: DataStorage | None =
None, task_logger: TaskLogger | None =None, client_factory: Callable[[...], BaseLlmClient] | None =None, data_models: dict[str, type[BaseModel]] | None =None, max_node_visits: int =1000)[source]¶ Bases:
objectExecutes a v2
WorkflowGraphas a DAG / state machine.The engine walks the graph from the start node, following transitions and evaluating branch nodes, until it reaches an end node. Each node’s result is stored in the run context; the serialized
WorkflowStateis checkpointed tostorageafter every node and per-node debug data flows totask_logger.- Parameters:¶
- graph : WorkflowGraph¶
The parsed workflow definition.
- storage : Optional[DataStorage]¶
Persistence backend for agents/sessions/runs/chat/state.
- task_logger : Optional[TaskLogger]¶
Backend for per-node debug data and model statistics.
- client_factory : Optional[ClientFactory]¶
Factory
(model, parameters, stats_receiver) -> BaseLlmClientused to build LLM clients. Defaults to the provider factory; inject a fake for offline testing.- max_node_visits : int¶
Safety cap on total node executions to guard against infinite loops.
- classmethod from_yaml(yaml_string: str, **kwargs) WorkflowEngine[source]¶
Build an engine from a YAML workflow definition string.
- classmethod from_yaml_path(yaml_path: str, **kwargs) WorkflowEngine[source]¶
Build an engine from a YAML workflow definition file.
- classmethod from_dict(data: dict, **kwargs) WorkflowEngine[source]¶
Build an engine from a parsed workflow definition dict.
-
async run(input_data: dict, *, session_id: str | None =
None, external_id: str | None =None) WorkflowState[source]¶ Execute the workflow for
input_dataand return the final state.
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
-
class kavalai.workflow.builder.WorkflowBuilder(name: str, *, description: str =
'', version: str ='2.0', llm_model: str | None =None, llm_kwargs: dict[str, Any] | None =None)[source]¶ Bases:
objectA small fluent builder for constructing a
WorkflowGraphin code.Every method returns
selfso calls can be chained, andbuild()validates and returns the graph (build_engine()returns a readyWorkflowEngine). For example:graph = ( WorkflowBuilder("Greeter", llm_model="openai/gpt-4o-mini") .data_type("input", {"user_message": str}) .data_type("output", {"agent_response": str}) .start("reply") .llm("reply", prompt="Greet the user.", inputs={"input": "input"}, output="output", next="end") .end() .build() )-
data_type(name: str, fields: dict[str, str | type | dict] | None =
None, *, schema: dict | None =None, ref: str | None =None) WorkflowBuilder[source]¶ Declare a data type.
Pass
fieldsfor an object of named scalars ({"intent": str}),schemafor a full JSON-schema fragment, orrefto alias another type.
- data_model(name: str, model: type[BaseModel]) WorkflowBuilder[source]¶
Declare a data type from a Pydantic model class.
The model is used directly at run time (its fields validate the matching node input/output and drive structured LLM output), so you get full Pydantic expressiveness —
Literalenums, defaults, validators — without writing a JSON-schema fragment. Its JSON schema is also recorded on the graph for storage and inspection.Build the engine with
build_engine()(which forwards the models) or passdata_models=toWorkflowEngineyourself.
-
llm(name: str, *, prompt: str, output: str, next: str, inputs: dict[str, ArgumentInfo | str | dict] | None =
None, use_history: bool =True, llm_model: str | None =None, llm_kwargs: dict[str, Any] | None =None, stream_output: bool =False) WorkflowBuilder[source]¶
-
agent(name: str, *, prompt: str, output: str, next: str, inputs: dict[str, ArgumentInfo | str | dict] | None =
None, allowed_tools: list[str] | None =None, max_steps: int =10, llm_model: str | None =None, llm_kwargs: dict[str, Any] | None =None) WorkflowBuilder[source]¶
-
function(name: str, *, tool: str, output: str, next: str, inputs: dict[str, ArgumentInfo | str | dict] | None =
None, method: str ='get') WorkflowBuilder[source]¶
-
switch(name: str, *, expr: str, cases: dict[str, str] | None =
None, default: str | None =None) WorkflowBuilder[source]¶
- python_function(name: str, path: str) WorkflowBuilder[source]¶
Register a Python tool by import path (e.g.
pkg.mod.func).
- rest_server(server: RestServer | dict) WorkflowBuilder[source]¶
- build() WorkflowGraph[source]¶
Validate and return the
WorkflowGraph.
- build_engine(**kwargs)[source]¶
Build the graph and wrap it in a ready-to-run
WorkflowEngine.Any Pydantic models registered via
data_model()are forwarded to the engine. Keyword arguments are passed through (storage,task_logger,client_factory,data_models,max_node_visits).
-
data_type(name: str, fields: dict[str, str | type | dict] | None =
Graph models¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
- class kavalai.workflow.models.BaseNode(*, name: str)[source]¶
Bases:
BaseModelCommon fields shared by every node in a workflow graph.
A node is one vertex in the DAG/state-machine.
nameuniquely identifies the node and is the target referenced by transitions (next/then/else/cases/default).-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
model_config : ClassVar[ConfigDict] =
-
class kavalai.workflow.models.StartNode(*, name: str, type: 'start' =
'start', next: str)[source]¶ Bases:
BaseNodeInteraction start node.
The caller hands an input to this node; execution begins here and proceeds to
next.-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
model_config : ClassVar[ConfigDict] =
-
class kavalai.workflow.models.EndNode(*, name: str, type: 'end' =
'end', output: str ='output')[source]¶ Bases:
BaseNodeInteraction end node.
Reaching an end node terminates the interaction.
outputnames the context variable whose value is returned to the caller.-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
model_config : ClassVar[ConfigDict] =
- class kavalai.workflow.models.LLMNode(*, name: str, type: ~typing.Literal['llm'] = 'llm', prompt: str, inputs: dict[str, ~kavalai.agents.workflow_model.ArgumentInfo] = {}, output: str, next: str, use_history: bool = True, llm_model: str | None = None, llm_kwargs: dict[str, ~typing.Any] = <factory>, stream_output: bool = False)[source]¶
Bases:
BaseNodeSingle LLM completion node.
Resolves
inputsfrom context, renderspromptand calls the LLM, storing the structured result in theoutputcontext variable, then transitions tonext.- inputs : dict[str, ArgumentInfo]¶
-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class kavalai.workflow.models.AgentNode(*, name: str, type: ~typing.Literal['agent'] = 'agent', prompt: str, inputs: dict[str, ~kavalai.agents.workflow_model.ArgumentInfo] = {}, output: str, next: str, allowed_tools: list[str] = <factory>, max_steps: int = 10, llm_model: str | None = None, llm_kwargs: dict[str, ~typing.Any] = <factory>)[source]¶
Bases:
BaseNodeMulti-step agent node.
Runs the v2
Agentloop (tool calling) up tomax_stepsand stores the final result inoutput.- inputs : dict[str, ArgumentInfo]¶
-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
class kavalai.workflow.models.FunctionNode(*, name: str, type: 'function' =
'function', tool: str, inputs: dict[str, ArgumentInfo] ={}, output: str, next: str, method: str ='get')[source]¶ Bases:
BaseNodeFunction-call node.
Invokes a single tool via the
FunctionKernel(python:///rest:///mcp://URIs) and stores the result inoutput.- inputs : dict[str, ArgumentInfo]¶
-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
class kavalai.workflow.models.IfNode(*, name: str, type: 'if' =
'if', condition: str, then: str, else_: str)[source]¶ Bases:
BaseNodeBoolean branch node.
Evaluates the
conditionstring expression (e.g.state.count > 3) against the run context and transitions tothenwhen truthy, otherwise toelse_(authored aselsein YAML).-
model_config : ClassVar[ConfigDict] =
{'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
model_config : ClassVar[ConfigDict] =
-
class kavalai.workflow.models.SwitchNode(*, name: str, type: 'switch' =
'switch', expr: str, cases: dict[str, str] ={}, default: str | None =None)[source]¶ Bases:
BaseNodeMulti-way branch node.
Evaluates the
exprstring expression, stringifies the result and looks it up incases; falls back todefaultwhen no case matches.-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
model_config : ClassVar[ConfigDict] =
- class kavalai.workflow.models.WorkflowGraph(*, name: str, description: str = '', version: str = '2.0', llm_model: str | None = None, llm_kwargs: dict[str, ~typing.Any] = <factory>, data_types: dict[str, dict], rest_servers: list[~kavalai.agents.workflow_model.RestServer] = [], mcp_servers: list[~kavalai.agents.workflow_model.McpServer] = [], templates: list[~kavalai.agents.workflow_model.TemplateModel] = [], python_functions: list[~kavalai.agents.workflow_model.PythonFunction] = [], nodes: list[~typing.Annotated[~kavalai.workflow.models.StartNode | ~kavalai.workflow.models.EndNode | ~kavalai.workflow.models.LLMNode | ~kavalai.workflow.models.AgentNode | ~kavalai.workflow.models.FunctionNode | ~kavalai.workflow.models.IfNode | ~kavalai.workflow.models.SwitchNode, FieldInfo(annotation=NoneType, required=True, discriminator='type')]], start: str | None = None)[source]¶
Bases:
BaseModelA workflow: a directed graph of nodes forming a state machine.
- Variables:¶
- name : str
Workflow / agent name.
- description : str
Human-readable description.
- version : str
Schema version.
- llm_model : str | None
Default LLM model (
provider/model); nodes may override.- llm_kwargs : dict[str, Any]
Default LLM kwargs; nodes may override.
- data_types : dict[str, dict]
JSON-schema data type definitions (parsed by SchemaParser).
- nodes : list[kavalai.workflow.models.StartNode | kavalai.workflow.models.EndNode | kavalai.workflow.models.LLMNode | kavalai.workflow.models.AgentNode | kavalai.workflow.models.FunctionNode | kavalai.workflow.models.IfNode | kavalai.workflow.models.SwitchNode]
The graph vertices.
- start : str | None
Optional explicit start node name (otherwise the single
startnode is used).
- rest_servers : list[RestServer]¶
- templates : list[TemplateModel]¶
- python_functions : list[PythonFunction]¶
- nodes : list[Annotated[StartNode | EndNode | LLMNode | AgentNode | FunctionNode | IfNode | SwitchNode, FieldInfo(annotation=NoneType, required=True, discriminator='type')]]¶
- validate_graph() WorkflowGraph[source]¶
- property node_map : dict[str, Annotated[StartNode | EndNode | LLMNode | AgentNode | FunctionNode | IfNode | SwitchNode, FieldInfo(annotation=NoneType, required=True, discriminator='type')]]¶
-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Run state¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
- class kavalai.workflow.state.WorkflowState(*, workflow_name: str, status: ~typing.Literal['pending', 'running', 'completed', 'failed'] = 'pending', current_node: str | None = None, trace: list[str] = <factory>, data: dict = <factory>, input_data: dict = <factory>, output_data: dict | None = None, error: str | None = None, invocation_id: str | None = None, token_usage: dict | None = None, run_id: str | None = None, session_id: str | None = None, agent_id: str | None = None)[source]¶
Bases:
BaseModelSerializable runtime state of a single workflow interaction.
The state is JSON round-trippable (
to_json/from_json) and is checkpointed to the configuredDataStorageafter every node so a run can be inspected or resumed.workflow_name: name of the workflow being executed. status: lifecycle status of the run. current_node: name of the node about to run / last run. trace: ordered list of executed node names. data: the run context data (
RunContext.data) passed throughto_plain. input_data: the original interaction input. output_data: the value of the end node’s output variable (when finished). error: error message whenstatus == 'failed'. invocation_id: short id shared by every log line of this run (for scanning). token_usage: aggregate model token counts for the run. run_id / session_id / agent_id: persistence identifiers (string UUIDs).- classmethod from_json(data: str) WorkflowState[source]¶
Deserialize a
WorkflowStatefrom a JSON string.
-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Expressions¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
- exception kavalai.workflow.expressions.ExpressionError[source]¶
Bases:
ValueErrorRaised when an expression cannot be parsed or safely evaluated.
Signals an invalid or empty expression, a syntax error, use of an unsupported/disallowed construct (function calls, comprehensions, imports, etc.) or an error encountered while evaluating the expression.
- kavalai.workflow.expressions.evaluate_expression(expr: str, context: dict) Any[source]¶
Safely evaluate a simple string expression against
context.Supports comparisons (
==,!=,<,<=,>,>=,in,not in,is,is not), boolean logic (and,or,not), arithmetic (+,-,*,/,//,%), literals, and list/tuple/dict displays. Names and attribute/subscript chains (state.count,input.user_message,items[0].title) are resolved fromcontextviaresolve_path(); unknown names resolve toNone.Arbitrary code is rejected: function calls, lambdas, comprehensions, imports, attribute writes, etc. all raise
ExpressionError.
- kavalai.workflow.expressions.evaluate_bool(expr: str, context: dict) bool[source]¶
Evaluate
exprand coerce the result to a bool (forifnodes).
- kavalai.workflow.expressions.evaluate_value(expr: str, context: dict) str[source]¶
Evaluate
exprand stringify the result (forswitchcase lookup).
Client factory¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
- kavalai.workflow.clients.build_parameters(llm_kwargs: dict[str, Any] | None) LlmClientParameters[source]¶
Build
LlmClientParametersfrom a node’sllm_kwargs.Recognised keys (temperature, top_p, reasoning_effort, service_tier, timeout_seconds) are mapped onto the parameters model; unknown keys are ignored so authors can keep provider-specific extras without breaking.
-
kavalai.workflow.clients.make_client(model: str, parameters: LlmClientParameters | None =
None, stats_receiver: ModelStatsReceiver | None =None) BaseLlmClient[source]¶ Construct a v2 LLM client from a
provider/modelstring.Supported providers:
openai,gemini,ollama,browser. Thebrowserprovider runs inference client-side via a WebLLM bridge (Pyodide only) and needs no API key — seeBrowserLLMClient.
Storage backends¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
- class kavalai.workflow.storage.base.RunHandle(*, agent_id: str, session_id: str, run_id: str)[source]¶
Bases:
BaseModelIdentifiers created when a workflow run is initialized.
Mirrors the agent/session/run triple of the existing persistence layer so a Postgres backend can map straight onto
AgentService.-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
model_config : ClassVar[ConfigDict] =
-
class kavalai.workflow.storage.base.ChatMsg(*, role: str, content: str | None =
None)[source]¶ Bases:
BaseModelA single chat message row returned from history queries.
-
model_config : ClassVar[ConfigDict] =
{}¶ Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
-
model_config : ClassVar[ConfigDict] =
- class kavalai.workflow.storage.base.DataStorage[source]¶
Bases:
ABCCommon data-storage interface for workflow persistence.
Backends implement this to persist agents, sessions, runs, chat messages and the serialized
WorkflowState. The method shapes intentionally mirrorAgentServiceso a Postgres backend is a thin delegation layer over the existing tables.-
abstractmethod async initialize_run(*, workflow_name: str, description: str | None =
None, input_schema: dict | None =None, output_schema: dict | None =None, workflow: dict | None =None, session_id: str | None =None, external_id: str | None =None, input_data: dict | None =None) RunHandle[source]¶ Create (or reuse) the agent + session and start a new run.
-
abstractmethod async update_run(run_id: str, *, output_data: dict | None =
None, context: dict | None =None) None[source]¶ Persist the final output and/or context of a run.
- abstractmethod async save_state(run_id: str, state: WorkflowState) None[source]¶
Checkpoint the serialized workflow state for a run.
- abstractmethod async load_state(run_id: str) WorkflowState | None[source]¶
Load the last checkpointed workflow state for a run, if any.
- abstractmethod async add_chat_message(*, agent_id: str, session_id: str, run_id: str | None, role: str, content: str | None) None[source]¶
Append a chat message to a session.
-
abstractmethod async initialize_run(*, workflow_name: str, description: str | None =
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
-
class kavalai.workflow.storage.memory.SqliteDataStorage(path: str =
':memory:')[source]¶ Bases:
DataStorageIn-memory (or file-backed) data storage using SQLite via aiosqlite.
Defaults to a private
:memory:database, which makes it ideal for local development and tests. Pass a filepathto persist across process runs.-
async initialize_run(*, workflow_name: str, description: str | None =
None, input_schema: dict | None =None, output_schema: dict | None =None, workflow: dict | None =None, session_id: str | None =None, external_id: str | None =None, input_data: dict | None =None) RunHandle[source]¶ Create (or reuse) the agent + session and start a new run.
-
async update_run(run_id: str, *, output_data: dict | None =
None, context: dict | None =None) None[source]¶ Persist the final output and/or context of a run.
- async save_state(run_id: str, state: WorkflowState) None[source]¶
Checkpoint the serialized workflow state for a run.
- async load_state(run_id: str) WorkflowState | None[source]¶
Load the last checkpointed workflow state for a run, if any.
- async add_chat_message(*, agent_id: str, session_id: str, run_id: str | None, role: str, content: str | None) None[source]¶
Append a chat message to a session.
-
async initialize_run(*, workflow_name: str, description: str | None =
- class kavalai.workflow.storage.memory.InMemoryDataStorage[source]¶
Bases:
DataStoragePure-Python, thread-free in-memory data storage.
A dependency-free alternative to
SqliteDataStoragethat keeps everything in plain Python structures. Crucially it spawns no background thread, so it works under Pyodide / WebAssembly — whereaiosqlitecannot start its worker thread (RuntimeError: can't start new thread). It is the natural store for the in-browser playground and for fast unit tests.Semantics mirror
SqliteDataStorage: agents are reused by name, sessions are reused when a known id is supplied, and chat history is returned oldest-first. State lives only for the lifetime of the instance.-
async initialize_run(*, workflow_name: str, description: str | None =
None, input_schema: dict | None =None, output_schema: dict | None =None, workflow: dict | None =None, session_id: str | None =None, external_id: str | None =None, input_data: dict | None =None) RunHandle[source]¶ Create (or reuse) the agent + session and start a new run.
-
async update_run(run_id: str, *, output_data: dict | None =
None, context: dict | None =None) None[source]¶ Persist the final output and/or context of a run.
- async save_state(run_id: str, state: WorkflowState) None[source]¶
Checkpoint the serialized workflow state for a run.
- async load_state(run_id: str) WorkflowState | None[source]¶
Load the last checkpointed workflow state for a run, if any.
-
async initialize_run(*, workflow_name: str, description: str | None =
Task logging backends¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
- class kavalai.workflow.tasklog.base.TaskLogger[source]¶
Bases:
ABCCommon interface for storing per-node debugging data and model stats.
Logging is fire-and-forget: the public
log_*methods schedule a background task and return immediately so they never block workflow execution. Callflush()(e.g. at the end of a run or in tests) to await all pending writes.-
log_node(*, run_id: str | None, session_id: str | None, agent_id: str | None, node_name: str, node_type: str, inputs: dict | None, output: Any, prompt: str | None =
None, duration: float =0.0, errors: list[str] | None =None) None[source]¶ Record the execution of a single node (fire-and-forget).
-
log_node(*, run_id: str | None, session_id: str | None, agent_id: str | None, node_name: str, node_type: str, inputs: dict | None, output: Any, prompt: str | None =
-
class kavalai.workflow.tasklog.base.StatsBridge(task_logger: TaskLogger, agent_id: str | None =
None)[source]¶ Bases:
ModelStatsReceiverAdapter forwarding LLM
ModelCallStatevents to aTaskLogger.Wired into v2 LLM clients via their
model_stats_receiverso every model call made during a workflow is logged against the run’s agent.- receive_model_stats(stats: ModelCallStat) None[source]¶
-
class kavalai.workflow.tasklog.base.TokenAccumulator(task_logger: TaskLogger | None =
None, agent_id: str | None =None)[source]¶ Bases:
ModelStatsReceiverAggregates token usage across a workflow run and optionally forwards each
ModelCallStatto aTaskLogger.The engine wires one accumulator into every LLM client built during a run so that, when the run ends, it can report the total token spend. When a
task_loggeris supplied each individual call is still logged through it, so this fully subsumesStatsBridge.- receive_model_stats(stats: ModelCallStat) None[source]¶
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
-
class kavalai.workflow.tasklog.sqlite.SqliteTaskLogger(path: str =
':memory:')[source]¶ Bases:
TaskLoggerTask logger storing node executions and model stats in SQLite.
Defaults to a private
:memory:database. Pass a filepathto keep the debugging data across runs.