Building agents with Workflows¶
A workflow is a directed graph — a small state machine — that you author in YAML and run as a single interaction: the caller hands an input to the start node and reads the result from the end node.
Between those two boundaries you wire together a handful of node types:
Node |
What it does |
|---|---|
|
the interaction boundary (input in, output out) |
|
one structured LLM completion |
|
a multi-step, tool-using agent loop |
|
a single tool call ( |
|
branch on a simple string expression |
The engine walks the graph, checkpoints a JSON-serialisable state after every node, and records per-node debug data and model statistics through pluggable storage and task-logger backends. This walk-through builds those ideas up one node at a time.
Setup¶
We use the SQLite (in-memory) backends so everything here runs locally, and
openai/gpt-4o-mini for the LLM nodes. Setting KAVALAI_DEFAULT_LLM_MODEL lets
us omit llm_model from every graph below.
import os
import dotenv
dotenv.load_dotenv("../.env")
os.environ["KAVALAI_DEFAULT_LLM_MODEL"] = "openai/gpt-4o-mini"
assert os.getenv("OPENAI_API_KEY"), "Set OPENAI_API_KEY to run the LLM nodes."
from kavalai.workflow import (
WorkflowEngine,
SqliteDataStorage,
SqliteTaskLogger,
)
1. The smallest workflow: start → llm → end¶
A graph is name + data_types + nodes. Two conventions make the boundary
ergonomic: the input is the data type named input, and the value returned
to the caller is the output variable named on the end node.
data_types are JSON-schema fragments compiled to Pydantic models, so every
node’s input and output is validated. Inside a prompt you can interpolate the
context with {{ context.<path> }}.
greeter_yaml = """
name: Greeter
description: A one-node agent that greets the user by name.
data_types:
input:
type: object
properties:
user_message: {type: string}
output:
type: object
properties:
agent_response: {type: string}
nodes:
- {name: start, type: start, next: reply}
- name: reply
type: llm
prompt: |
You are a warm, concise greeter.
Reply to the user in one friendly sentence: {{ context.input.user_message }}
inputs:
input: {type: context, value: input}
output: output
next: end
- {name: end, type: end, output: output}
"""
engine = WorkflowEngine.from_yaml(
greeter_yaml,
storage=SqliteDataStorage(),
task_logger=SqliteTaskLogger(),
)
state = await engine.run({"user_message": "Hi, I'm Timo!"})
print(state.output_data["agent_response"])
print("path:", " → ".join(state.trace))
2026-06-18 14:34:02.301 | INFO | kavalai.workflow.engine:run:348 - [beca9c36] Starting workflow 'Greeter'
2026-06-18 14:34:08.659 | INFO | kavalai.workflow.engine:_finish:460 - [beca9c36] Workflow 'Greeter' completed (session=9f267dd4-af5e-491f-a428-983f1c3b6906)
2026-06-18 14:34:08.661 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [beca9c36] Workflow 'Greeter' token usage: 1 model call(s), 106 tokens (prompt=90, completion=16)
Hi Timo! It's great to meet you!
path: start → reply → end
2. The run state is serialisable and persisted¶
engine.run(...) returns a WorkflowState: the status, the ordered trace of
visited nodes, the full context data, and the final output_data. It round-trips
through JSON and is checkpointed to storage after every node, so a run can be
reloaded and inspected later. The interaction is also recorded as a chat session.
# The entire state serialises to JSON (and back via WorkflowState.from_json).
print(state.to_json()[:240], "…\n")
# It was checkpointed to storage after each node — reload it by run id.
reloaded = await engine.storage.load_state(state.run_id)
print("reloaded status:", reloaded.status, "| nodes:", reloaded.trace, "\n")
# The same backend recorded the conversation turns.
for msg in await engine.storage.get_chat_history(state.session_id):
print(f"{msg.role:>9}: {msg.content}")
{"workflow_name":"Greeter","status":"completed","current_node":"end","trace":["start","reply","end"],"data":{"input":{"user_message":"Hi, I'm Timo!"},"output":{"agent_response":"Hi Timo! It's great to meet you!"}},"input_data":{"user_messag …
reloaded status: completed | nodes: ['start', 'reply', 'end']
user: Hi, I'm Timo!
assistant: Hi Timo! It's great to meet you!
3. Function nodes — calling tools¶
A function node performs exactly one tool call through the FunctionKernel,
addressed by URI (python://, rest://, mcp://). Register a Python tool on
the engine’s kernel and reference it from the node. The structured return value
is stored in the node’s output variable and stays in the context for later
nodes to use.
from pydantic import BaseModel
from kavalai.functionkernel import pythontool
class Weather(BaseModel):
summary: str
celsius: int
@pythontool
def get_weather(city: str) -> Weather:
# A real tool would call a weather API; we fake it for the demo.
return Weather(summary=f"clear skies over {city}", celsius=24)
weather_yaml = """
name: Weather forecaster
description: Looks up weather with a function node, then phrases it with an LLM.
data_types:
input:
type: object
properties:
user_message: {type: string}
city: {type: string}
weather:
type: object
properties:
summary: {type: string}
celsius: {type: integer}
output:
type: object
properties:
agent_response: {type: string}
nodes:
- {name: start, type: start, next: fetch}
- name: fetch
type: function
tool: python://get_weather
inputs:
city: {type: context, value: input.city}
output: weather
next: phrase
- name: phrase
type: llm
prompt: "Turn the weather reading into a cheerful one-line forecast."
inputs:
weather: {type: context, value: weather}
output: output
next: end
- {name: end, type: end, output: output}
"""
engine = WorkflowEngine.from_yaml(weather_yaml, storage=SqliteDataStorage())
engine.kernel.register_python_tool("get_weather", get_weather)
state = await engine.run({"user_message": "weather?", "city": "Tallinn"})
print("tool output kept in context:", state.data["weather"])
print("final:", state.output_data["agent_response"])
2026-06-18 14:34:40.217 | INFO | kavalai.workflow.engine:run:348 - [f0e7b653] Starting workflow 'Weather forecaster'
2026-06-18 14:34:41.519 | INFO | kavalai.workflow.engine:_finish:460 - [f0e7b653] Workflow 'Weather forecaster' completed (session=c3b4c1d9-e3e7-471c-8013-42be40568ab2)
2026-06-18 14:34:41.521 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [f0e7b653] Workflow 'Weather forecaster' token usage: 1 model call(s), 104 tokens (prompt=77, completion=27)
tool output kept in context: {'summary': 'clear skies over Tallinn', 'celsius': 24}
final: Get ready for a beautiful day in Tallinn with clear skies and a lovely 24°C! ☀️
4. Branching with if and switch¶
Routing nodes evaluate a simple string expression against the run context.
switchevaluatesexpr, stringifies it, and matches it againstcases, falling back todefault.ifbranches on a booleancondition.
Here an llm node classifies the request and a switch routes to a specialised
reply. The same graph is run twice and takes a different path each time.
router_yaml = """
name: Support router
description: Classifies a support request and routes to a specialised reply.
data_types:
input:
type: object
properties:
user_message: {type: string}
classification:
type: object
properties:
intent: {type: string}
output:
type: object
properties:
agent_response: {type: string}
nodes:
- {name: start, type: start, next: classify}
- name: classify
type: llm
prompt: |
Classify the user's request as exactly one of: refund, technical, other.
Respond with that single lowercase word in the `intent` field.
inputs:
input: {type: context, value: input}
output: classification
next: route
- name: route
type: switch
expr: classification.intent
cases:
refund: refund_reply
technical: tech_reply
default: general_reply
- name: refund_reply
type: llm
prompt: "Empathetically acknowledge the refund request and outline next steps."
inputs: {input: {type: context, value: input}}
output: output
next: end
- name: tech_reply
type: llm
prompt: "Offer one concrete first troubleshooting step for the technical issue."
inputs: {input: {type: context, value: input}}
output: output
next: end
- name: general_reply
type: llm
prompt: "Answer the user's question helpfully and briefly."
inputs: {input: {type: context, value: input}}
output: output
next: end
- {name: end, type: end, output: output}
"""
for message in ["I want my money back!", "The app crashes on launch"]:
engine = WorkflowEngine.from_yaml(router_yaml)
state = await engine.run({"user_message": message})
print(repr(message))
print(" intent:", state.data["classification"]["intent"])
print(" path: ", " → ".join(state.trace))
print(" reply: ", state.output_data["agent_response"][:70], "…\n")
2026-06-18 14:35:25.823 | INFO | kavalai.workflow.engine:run:348 - [bbc32eb2] Starting workflow 'Support router'
2026-06-18 14:35:30.963 | INFO | kavalai.workflow.engine:_finish:460 - [bbc32eb2] Workflow 'Support router' completed (session=None)
2026-06-18 14:35:30.964 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [bbc32eb2] Workflow 'Support router' token usage: 2 model call(s), 306 tokens (prompt=151, completion=155)
2026-06-18 14:35:30.979 | INFO | kavalai.workflow.engine:run:348 - [a2e67d16] Starting workflow 'Support router'
'I want my money back!'
intent: refund
path: start → classify → route → refund_reply → end
reply: I completely understand your frustration and I’m here to help you with …
2026-06-18 14:35:33.154 | INFO | kavalai.workflow.engine:_finish:460 - [a2e67d16] Workflow 'Support router' completed (session=None)
2026-06-18 14:35:33.156 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [a2e67d16] Workflow 'Support router' token usage: 2 model call(s), 190 tokens (prompt=148, completion=42)
'The app crashes on launch'
intent: technical
path: start → classify → route → tech_reply → end
reply: Check for any available updates for the app and install them, as the c …
The expression language¶
if/switch expressions are evaluated safely (an AST whitelist, never
eval): comparisons, and/or/not, in, arithmetic, and dotted/indexed
access into the context. Unknown names resolve to None so guard checks degrade
gracefully. You can call the evaluator directly:
from kavalai.workflow import evaluate_expression, evaluate_bool
ctx = {"state": {"retries": 3, "status": "ok"}, "items": [{"score": 0.9}]}
print(evaluate_expression("state.retries >= 3 and state.status == 'ok'", ctx))
print(evaluate_expression("items[0].score > 0.8", ctx))
print(evaluate_bool("state.status in ['ok', 'done']", ctx))
print(evaluate_expression("missing.value", ctx)) # unknown path -> None
True
True
True
None
5. Agent nodes — multi-step tool use¶
An agent node runs the v2 Agent loop: it may call tools repeatedly, feeding
results back into its reasoning, before emitting its structured output. Give it a
kernel of tools and a max_steps budget.
import datetime
@pythontool
def current_time() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%A %H:%M UTC")
agent_yaml = """
name: Time-aware assistant
description: An agent that checks the time before greeting the user.
data_types:
input:
type: object
properties:
user_message: {type: string}
output:
type: object
properties:
agent_response: {type: string}
nodes:
- {name: start, type: start, next: assistant}
- name: assistant
type: agent
prompt: "Greet the user appropriately for the current time of day. Use a tool to check the time."
inputs: {input: {type: context, value: input}}
output: output
max_steps: 4
next: end
- {name: end, type: end, output: output}
"""
engine = WorkflowEngine.from_yaml(agent_yaml)
engine.kernel.register_python_tool("current_time", current_time)
state = await engine.run({"user_message": "Say hello!"})
print(state.output_data["agent_response"])
2026-06-18 14:36:16.863 | INFO | kavalai.workflow.engine:run:348 - [5587cbe0] Starting workflow 'Time-aware assistant'
2026-06-18 14:36:16.941 | INFO | kavalai.agents.agent:prompt:145 - Agent step 0/4
2026-06-18 14:36:19.974 | INFO | kavalai.agents.agent:_call_tool:241 - Calling tool python://current_time with {}
2026-06-18 14:36:19.978 | INFO | kavalai.agents.agent:prompt:145 - Agent step 1/4
2026-06-18 14:36:21.856 | INFO | kavalai.workflow.engine:_finish:460 - [5587cbe0] Workflow 'Time-aware assistant' completed (session=None)
2026-06-18 14:36:21.858 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [5587cbe0] Workflow 'Time-aware assistant' token usage: 2 model call(s), 1595 tokens (prompt=1481, completion=114)
Good morning!
6. Observability — the task logger¶
A TaskLogger captures per-node debug data and every model call. Logging is
fire-and-forget, so call flush() to await the writes. The SQLite logger uses
the same tasks / model_call_stats shape as the production Postgres backend.
storage = SqliteDataStorage()
tasklog = SqliteTaskLogger()
engine = WorkflowEngine.from_yaml(router_yaml, storage=storage, task_logger=tasklog)
state = await engine.run({"user_message": "How do I reset my password?"})
await tasklog.flush() # await the fire-and-forget writes
conn = await tasklog._connect()
print("Nodes executed:")
async with conn.execute(
"SELECT name, node_type, duration_seconds FROM tasks ORDER BY rowid"
) as cur:
for name, ntype, dur in await cur.fetchall():
print(f" {name:<14} {ntype:<8} {dur:.2f}s")
async with conn.execute("SELECT model, total_tokens FROM model_call_stats") as cur:
rows = await cur.fetchall()
print(f"\nModel calls: {len(rows)} · tokens: {sum(r[1] or 0 for r in rows)}")
2026-06-18 14:36:44.093 | INFO | kavalai.workflow.engine:run:348 - [029f65d5] Starting workflow 'Support router'
2026-06-18 14:36:47.199 | INFO | kavalai.workflow.engine:_finish:460 - [029f65d5] Workflow 'Support router' completed (session=d1a9939f-1173-4ca6-aa68-c121d286dbe9)
2026-06-18 14:36:47.201 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [029f65d5] Workflow 'Support router' token usage: 2 model call(s), 214 tokens (prompt=174, completion=40)
Nodes executed:
classify llm 1.20s
tech_reply llm 1.76s
Model calls: 2 · tokens: 214
7. Backends & deterministic testing¶
Persistence is pluggable behind two interfaces: DataStorage (agents /
sessions / runs / chat) and TaskLogger (node logs / model stats). This
notebook used the SQLite in-memory backends; the Postgres backends map onto the
existing agents/sessions/runs/chat_messages tables — swap them in without
touching a graph.
To test a workflow without a live model, inject a client_factory. The
engine calls it as factory(model, parameters, stats_receiver), so you can hand
back a canned client and get fully deterministic, offline runs.
from kavalai.llm_clients.base_client import BaseLlmClient
class StubClient(BaseLlmClient):
"""Returns canned structured output — no network, fully deterministic."""
def __init__(self, *args, **kwargs):
super().__init__()
async def chat_completions(self, *, chat_history, response_model=None):
values = {
name: ("refund" if name == "intent" else "Stubbed reply.")
for name in response_model.model_fields
}
return response_model(**values)
engine = WorkflowEngine.from_yaml(
router_yaml, client_factory=lambda *a, **k: StubClient()
)
state = await engine.run({"user_message": "anything at all"})
print("deterministic path:", " → ".join(state.trace)) # always routes via 'refund'
2026-06-18 14:37:02.465 | INFO | kavalai.workflow.engine:run:348 - [581f3f9f] Starting workflow 'Support router'
2026-06-18 14:37:02.469 | INFO | kavalai.workflow.engine:_finish:460 - [581f3f9f] Workflow 'Support router' completed (session=None)
2026-06-18 14:37:02.471 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [581f3f9f] Workflow 'Support router' token usage: 0 model call(s), 0 tokens (prompt=0, completion=0)
deterministic path: start → classify → route → refund_reply → end
8. Building workflows in code¶
YAML is great for authoring and sharing, but you can build the exact same graph
in Python with the fluent WorkflowBuilder — handy for generating workflows
dynamically or keeping the whole agent in one file. Each method returns the
builder so the chain reads like the graph itself; data_type accepts plain
Python types, and an inputs value given as a bare string is treated as a
context path. Call .build() for a WorkflowGraph, or .build_engine(...) for
a ready-to-run engine.
The returned state also carries two things worth noting: a unique
invocation_id — prefixed onto every log line of the run, so you can grep a
single invocation out of the logs — and the aggregate token_usage, which
the engine logs once the run finishes.
from kavalai.workflow import WorkflowBuilder
engine = (
WorkflowBuilder("Support router (code)", description="Built with the builder.")
.data_type("input", {"user_message": str})
.data_type("classification", {"intent": str})
.data_type("output", {"agent_response": str})
.start("classify")
.llm(
"classify",
prompt=(
"Classify the request as exactly one of: refund, technical, other. "
"Respond with that single lowercase word in the `intent` field."
),
inputs={"input": "input"}, # a string input == a context path
output="classification",
next="route",
)
.switch(
"route",
expr="classification.intent",
cases={"refund": "refund_reply", "technical": "tech_reply"},
default="general_reply",
)
.llm("refund_reply", prompt="Acknowledge the refund and outline next steps.",
inputs={"input": "input"}, output="output", next="end")
.llm("tech_reply", prompt="Offer one concrete first troubleshooting step.",
inputs={"input": "input"}, output="output", next="end")
.llm("general_reply", prompt="Answer the question helpfully and briefly.",
inputs={"input": "input"}, output="output", next="end")
.end()
.build_engine() # -> a ready WorkflowEngine
)
state = await engine.run({"user_message": "My app keeps crashing on launch"})
print("path: ", " → ".join(state.trace))
print("reply: ", state.output_data["agent_response"][:70], "…")
print("invocation_id:", state.invocation_id)
print("token_usage: ", state.token_usage)
2026-06-18 14:37:20.143 | INFO | kavalai.workflow.engine:run:348 - [dec6fb74] Starting workflow 'Support router (code)'
2026-06-18 14:37:23.698 | INFO | kavalai.workflow.engine:_finish:460 - [dec6fb74] Workflow 'Support router (code)' completed (session=None)
2026-06-18 14:37:23.699 | INFO | kavalai.workflow.engine:_log_token_usage:400 - [dec6fb74] Workflow 'Support router (code)' token usage: 2 model call(s), 200 tokens (prompt=145, completion=55)
path: start → classify → route → tech_reply → end
reply: Check if your app has the latest updates installed. Go to the app stor …
invocation_id: dec6fb74
token_usage: {'model_calls': 2, 'prompt_tokens': 145, 'completion_tokens': 55, 'total_tokens': 200}
Recap¶
A workflow is a DAG of typed nodes authored in YAML; the caller talks to the
start/endboundary.llm,agent, andfunctionnodes do the work;if/switchroute on safe string expressions.Every run produces a JSON-serialisable
WorkflowState, checkpointed and logged through pluggable backends.Inject a
client_factoryfor fast, deterministic tests.Build graphs from YAML or the
WorkflowBuilder; every run reports a uniqueinvocation_idand aggregatetoken_usage.
A complete, runnable graph lives at
examples/v2_workflow_support_agent.yaml.