Building agents with Workflows

A workflow is a directed graph — a small state machine — that you author in YAML and run as a single interaction: the caller hands an input to the start node and reads the result from the end node.

Between those two boundaries you wire together a handful of node types:

Node

What it does

start / end

the interaction boundary (input in, output out)

llm

one structured LLM completion

agent

a multi-step, tool-using agent loop

function

a single tool call (python://, rest://, mcp://)

if / switch

branch on a simple string expression

The engine walks the graph, checkpoints a JSON-serialisable state after every node, and records per-node debug data and model statistics through pluggable storage and task-logger backends. This walk-through builds those ideas up one node at a time.

Setup

We use the SQLite (in-memory) backends so everything here runs locally, and openai/gpt-4o-mini for the LLM nodes. Setting KAVALAI_DEFAULT_LLM_MODEL lets us omit llm_model from every graph below.

import os
import dotenv

dotenv.load_dotenv("../.env")

os.environ["KAVALAI_DEFAULT_LLM_MODEL"] = "openai/gpt-4o-mini"
assert os.getenv("OPENAI_API_KEY"), "Set OPENAI_API_KEY to run the LLM nodes."

from kavalai.workflow import (
    WorkflowEngine,
    SqliteDataStorage,
    SqliteTaskLogger,
)

1. The smallest workflow: start llm end

A graph is name + data_types + nodes. Two conventions make the boundary ergonomic: the input is the data type named input, and the value returned to the caller is the output variable named on the end node.

data_types are JSON-schema fragments compiled to Pydantic models, so every node’s input and output is validated. Inside a prompt you can interpolate the context with {{ context.<path> }}.

greeter_yaml = """
name: Greeter
description: A one-node agent that greets the user by name.
data_types:
  input:
    type: object
    properties:
      user_message: {type: string}
  output:
    type: object
    properties:
      agent_response: {type: string}
nodes:
  - {name: start, type: start, next: reply}
  - name: reply
    type: llm
    prompt: |
      You are a warm, concise greeter.
      Reply to the user in one friendly sentence: {{ context.input.user_message }}
    inputs:
      input: {type: context, value: input}
    output: output
    next: end
  - {name: end, type: end, output: output}
"""

engine = WorkflowEngine.from_yaml(
    greeter_yaml,
    storage=SqliteDataStorage(),
    task_logger=SqliteTaskLogger(),
)
state = await engine.run({"user_message": "Hi, I'm Timo!"})

print(state.output_data["agent_response"])
print("path:", " → ".join(state.trace))
2026-06-18 14:34:02.301 | INFO     | kavalai.workflow.engine:run:348 - [beca9c36] Starting workflow 'Greeter'
2026-06-18 14:34:08.659 | INFO     | kavalai.workflow.engine:_finish:460 - [beca9c36] Workflow 'Greeter' completed (session=9f267dd4-af5e-491f-a428-983f1c3b6906)
2026-06-18 14:34:08.661 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [beca9c36] Workflow 'Greeter' token usage: 1 model call(s), 106 tokens (prompt=90, completion=16)
Hi Timo! It's great to meet you!
path: start → reply → end

2. The run state is serialisable and persisted

engine.run(...) returns a WorkflowState: the status, the ordered trace of visited nodes, the full context data, and the final output_data. It round-trips through JSON and is checkpointed to storage after every node, so a run can be reloaded and inspected later. The interaction is also recorded as a chat session.

# The entire state serialises to JSON (and back via WorkflowState.from_json).
print(state.to_json()[:240], "…\n")

# It was checkpointed to storage after each node — reload it by run id.
reloaded = await engine.storage.load_state(state.run_id)
print("reloaded status:", reloaded.status, "| nodes:", reloaded.trace, "\n")

# The same backend recorded the conversation turns.
for msg in await engine.storage.get_chat_history(state.session_id):
    print(f"{msg.role:>9}: {msg.content}")
{"workflow_name":"Greeter","status":"completed","current_node":"end","trace":["start","reply","end"],"data":{"input":{"user_message":"Hi, I'm Timo!"},"output":{"agent_response":"Hi Timo! It's great to meet you!"}},"input_data":{"user_messag …

reloaded status: completed | nodes: ['start', 'reply', 'end'] 

     user: Hi, I'm Timo!
assistant: Hi Timo! It's great to meet you!

3. Function nodes — calling tools

A function node performs exactly one tool call through the FunctionKernel, addressed by URI (python://, rest://, mcp://). Register a Python tool on the engine’s kernel and reference it from the node. The structured return value is stored in the node’s output variable and stays in the context for later nodes to use.

from pydantic import BaseModel
from kavalai.functionkernel import pythontool


class Weather(BaseModel):
    summary: str
    celsius: int


@pythontool
def get_weather(city: str) -> Weather:
    # A real tool would call a weather API; we fake it for the demo.
    return Weather(summary=f"clear skies over {city}", celsius=24)


weather_yaml = """
name: Weather forecaster
description: Looks up weather with a function node, then phrases it with an LLM.
data_types:
  input:
    type: object
    properties:
      user_message: {type: string}
      city: {type: string}
  weather:
    type: object
    properties:
      summary: {type: string}
      celsius: {type: integer}
  output:
    type: object
    properties:
      agent_response: {type: string}
nodes:
  - {name: start, type: start, next: fetch}
  - name: fetch
    type: function
    tool: python://get_weather
    inputs:
      city: {type: context, value: input.city}
    output: weather
    next: phrase
  - name: phrase
    type: llm
    prompt: "Turn the weather reading into a cheerful one-line forecast."
    inputs:
      weather: {type: context, value: weather}
    output: output
    next: end
  - {name: end, type: end, output: output}
"""

engine = WorkflowEngine.from_yaml(weather_yaml, storage=SqliteDataStorage())
engine.kernel.register_python_tool("get_weather", get_weather)

state = await engine.run({"user_message": "weather?", "city": "Tallinn"})
print("tool output kept in context:", state.data["weather"])
print("final:", state.output_data["agent_response"])
2026-06-18 14:34:40.217 | INFO     | kavalai.workflow.engine:run:348 - [f0e7b653] Starting workflow 'Weather forecaster'
2026-06-18 14:34:41.519 | INFO     | kavalai.workflow.engine:_finish:460 - [f0e7b653] Workflow 'Weather forecaster' completed (session=c3b4c1d9-e3e7-471c-8013-42be40568ab2)
2026-06-18 14:34:41.521 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [f0e7b653] Workflow 'Weather forecaster' token usage: 1 model call(s), 104 tokens (prompt=77, completion=27)
tool output kept in context: {'summary': 'clear skies over Tallinn', 'celsius': 24}
final: Get ready for a beautiful day in Tallinn with clear skies and a lovely 24°C! ☀️

4. Branching with if and switch

Routing nodes evaluate a simple string expression against the run context.

  • switch evaluates expr, stringifies it, and matches it against cases, falling back to default.

  • if branches on a boolean condition.

Here an llm node classifies the request and a switch routes to a specialised reply. The same graph is run twice and takes a different path each time.

router_yaml = """
name: Support router
description: Classifies a support request and routes to a specialised reply.
data_types:
  input:
    type: object
    properties:
      user_message: {type: string}
  classification:
    type: object
    properties:
      intent: {type: string}
  output:
    type: object
    properties:
      agent_response: {type: string}
nodes:
  - {name: start, type: start, next: classify}
  - name: classify
    type: llm
    prompt: |
      Classify the user's request as exactly one of: refund, technical, other.
      Respond with that single lowercase word in the `intent` field.
    inputs:
      input: {type: context, value: input}
    output: classification
    next: route
  - name: route
    type: switch
    expr: classification.intent
    cases:
      refund: refund_reply
      technical: tech_reply
    default: general_reply
  - name: refund_reply
    type: llm
    prompt: "Empathetically acknowledge the refund request and outline next steps."
    inputs: {input: {type: context, value: input}}
    output: output
    next: end
  - name: tech_reply
    type: llm
    prompt: "Offer one concrete first troubleshooting step for the technical issue."
    inputs: {input: {type: context, value: input}}
    output: output
    next: end
  - name: general_reply
    type: llm
    prompt: "Answer the user's question helpfully and briefly."
    inputs: {input: {type: context, value: input}}
    output: output
    next: end
  - {name: end, type: end, output: output}
"""

for message in ["I want my money back!", "The app crashes on launch"]:
    engine = WorkflowEngine.from_yaml(router_yaml)
    state = await engine.run({"user_message": message})
    print(repr(message))
    print("  intent:", state.data["classification"]["intent"])
    print("  path:  ", " → ".join(state.trace))
    print("  reply: ", state.output_data["agent_response"][:70], "…\n")
2026-06-18 14:35:25.823 | INFO     | kavalai.workflow.engine:run:348 - [bbc32eb2] Starting workflow 'Support router'
2026-06-18 14:35:30.963 | INFO     | kavalai.workflow.engine:_finish:460 - [bbc32eb2] Workflow 'Support router' completed (session=None)
2026-06-18 14:35:30.964 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [bbc32eb2] Workflow 'Support router' token usage: 2 model call(s), 306 tokens (prompt=151, completion=155)
2026-06-18 14:35:30.979 | INFO     | kavalai.workflow.engine:run:348 - [a2e67d16] Starting workflow 'Support router'
'I want my money back!'
  intent: refund
  path:   start → classify → route → refund_reply → end
  reply:  I completely understand your frustration and I’m here to help you with …
2026-06-18 14:35:33.154 | INFO     | kavalai.workflow.engine:_finish:460 - [a2e67d16] Workflow 'Support router' completed (session=None)
2026-06-18 14:35:33.156 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [a2e67d16] Workflow 'Support router' token usage: 2 model call(s), 190 tokens (prompt=148, completion=42)
'The app crashes on launch'
  intent: technical
  path:   start → classify → route → tech_reply → end
  reply:  Check for any available updates for the app and install them, as the c …

The expression language

if/switch expressions are evaluated safely (an AST whitelist, never eval): comparisons, and/or/not, in, arithmetic, and dotted/indexed access into the context. Unknown names resolve to None so guard checks degrade gracefully. You can call the evaluator directly:

from kavalai.workflow import evaluate_expression, evaluate_bool

ctx = {"state": {"retries": 3, "status": "ok"}, "items": [{"score": 0.9}]}
print(evaluate_expression("state.retries >= 3 and state.status == 'ok'", ctx))
print(evaluate_expression("items[0].score > 0.8", ctx))
print(evaluate_bool("state.status in ['ok', 'done']", ctx))
print(evaluate_expression("missing.value", ctx))  # unknown path -> None
True
True
True
None

5. Agent nodes — multi-step tool use

An agent node runs the v2 Agent loop: it may call tools repeatedly, feeding results back into its reasoning, before emitting its structured output. Give it a kernel of tools and a max_steps budget.

import datetime


@pythontool
def current_time() -> str:
    return datetime.datetime.now(datetime.timezone.utc).strftime("%A %H:%M UTC")


agent_yaml = """
name: Time-aware assistant
description: An agent that checks the time before greeting the user.
data_types:
  input:
    type: object
    properties:
      user_message: {type: string}
  output:
    type: object
    properties:
      agent_response: {type: string}
nodes:
  - {name: start, type: start, next: assistant}
  - name: assistant
    type: agent
    prompt: "Greet the user appropriately for the current time of day. Use a tool to check the time."
    inputs: {input: {type: context, value: input}}
    output: output
    max_steps: 4
    next: end
  - {name: end, type: end, output: output}
"""

engine = WorkflowEngine.from_yaml(agent_yaml)
engine.kernel.register_python_tool("current_time", current_time)
state = await engine.run({"user_message": "Say hello!"})
print(state.output_data["agent_response"])
2026-06-18 14:36:16.863 | INFO     | kavalai.workflow.engine:run:348 - [5587cbe0] Starting workflow 'Time-aware assistant'
2026-06-18 14:36:16.941 | INFO     | kavalai.agents.agent:prompt:145 - Agent step 0/4
2026-06-18 14:36:19.974 | INFO     | kavalai.agents.agent:_call_tool:241 - Calling tool python://current_time with {}
2026-06-18 14:36:19.978 | INFO     | kavalai.agents.agent:prompt:145 - Agent step 1/4
2026-06-18 14:36:21.856 | INFO     | kavalai.workflow.engine:_finish:460 - [5587cbe0] Workflow 'Time-aware assistant' completed (session=None)
2026-06-18 14:36:21.858 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [5587cbe0] Workflow 'Time-aware assistant' token usage: 2 model call(s), 1595 tokens (prompt=1481, completion=114)
Good morning!

6. Observability — the task logger

A TaskLogger captures per-node debug data and every model call. Logging is fire-and-forget, so call flush() to await the writes. The SQLite logger uses the same tasks / model_call_stats shape as the production Postgres backend.

storage = SqliteDataStorage()
tasklog = SqliteTaskLogger()
engine = WorkflowEngine.from_yaml(router_yaml, storage=storage, task_logger=tasklog)
state = await engine.run({"user_message": "How do I reset my password?"})
await tasklog.flush()  # await the fire-and-forget writes

conn = await tasklog._connect()
print("Nodes executed:")
async with conn.execute(
    "SELECT name, node_type, duration_seconds FROM tasks ORDER BY rowid"
) as cur:
    for name, ntype, dur in await cur.fetchall():
        print(f"  {name:<14} {ntype:<8} {dur:.2f}s")

async with conn.execute("SELECT model, total_tokens FROM model_call_stats") as cur:
    rows = await cur.fetchall()
print(f"\nModel calls: {len(rows)} · tokens: {sum(r[1] or 0 for r in rows)}")
2026-06-18 14:36:44.093 | INFO     | kavalai.workflow.engine:run:348 - [029f65d5] Starting workflow 'Support router'
2026-06-18 14:36:47.199 | INFO     | kavalai.workflow.engine:_finish:460 - [029f65d5] Workflow 'Support router' completed (session=d1a9939f-1173-4ca6-aa68-c121d286dbe9)
2026-06-18 14:36:47.201 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [029f65d5] Workflow 'Support router' token usage: 2 model call(s), 214 tokens (prompt=174, completion=40)
Nodes executed:
  classify       llm      1.20s
  tech_reply     llm      1.76s

Model calls: 2 · tokens: 214

7. Backends & deterministic testing

Persistence is pluggable behind two interfaces: DataStorage (agents / sessions / runs / chat) and TaskLogger (node logs / model stats). This notebook used the SQLite in-memory backends; the Postgres backends map onto the existing agents/sessions/runs/chat_messages tables — swap them in without touching a graph.

To test a workflow without a live model, inject a client_factory. The engine calls it as factory(model, parameters, stats_receiver), so you can hand back a canned client and get fully deterministic, offline runs.

from kavalai.llm_clients.base_client import BaseLlmClient


class StubClient(BaseLlmClient):
    """Returns canned structured output — no network, fully deterministic."""

    def __init__(self, *args, **kwargs):
        super().__init__()

    async def chat_completions(self, *, chat_history, response_model=None):
        values = {
            name: ("refund" if name == "intent" else "Stubbed reply.")
            for name in response_model.model_fields
        }
        return response_model(**values)


engine = WorkflowEngine.from_yaml(
    router_yaml, client_factory=lambda *a, **k: StubClient()
)
state = await engine.run({"user_message": "anything at all"})
print("deterministic path:", " → ".join(state.trace))  # always routes via 'refund'
2026-06-18 14:37:02.465 | INFO     | kavalai.workflow.engine:run:348 - [581f3f9f] Starting workflow 'Support router'
2026-06-18 14:37:02.469 | INFO     | kavalai.workflow.engine:_finish:460 - [581f3f9f] Workflow 'Support router' completed (session=None)
2026-06-18 14:37:02.471 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [581f3f9f] Workflow 'Support router' token usage: 0 model call(s), 0 tokens (prompt=0, completion=0)
deterministic path: start → classify → route → refund_reply → end

8. Building workflows in code

YAML is great for authoring and sharing, but you can build the exact same graph in Python with the fluent WorkflowBuilder — handy for generating workflows dynamically or keeping the whole agent in one file. Each method returns the builder so the chain reads like the graph itself; data_type accepts plain Python types, and an inputs value given as a bare string is treated as a context path. Call .build() for a WorkflowGraph, or .build_engine(...) for a ready-to-run engine.

The returned state also carries two things worth noting: a unique invocation_id — prefixed onto every log line of the run, so you can grep a single invocation out of the logs — and the aggregate token_usage, which the engine logs once the run finishes.

from kavalai.workflow import WorkflowBuilder

engine = (
    WorkflowBuilder("Support router (code)", description="Built with the builder.")
    .data_type("input", {"user_message": str})
    .data_type("classification", {"intent": str})
    .data_type("output", {"agent_response": str})
    .start("classify")
    .llm(
        "classify",
        prompt=(
            "Classify the request as exactly one of: refund, technical, other. "
            "Respond with that single lowercase word in the `intent` field."
        ),
        inputs={"input": "input"},          # a string input == a context path
        output="classification",
        next="route",
    )
    .switch(
        "route",
        expr="classification.intent",
        cases={"refund": "refund_reply", "technical": "tech_reply"},
        default="general_reply",
    )
    .llm("refund_reply", prompt="Acknowledge the refund and outline next steps.",
         inputs={"input": "input"}, output="output", next="end")
    .llm("tech_reply", prompt="Offer one concrete first troubleshooting step.",
         inputs={"input": "input"}, output="output", next="end")
    .llm("general_reply", prompt="Answer the question helpfully and briefly.",
         inputs={"input": "input"}, output="output", next="end")
    .end()
    .build_engine()                          # -> a ready WorkflowEngine
)

state = await engine.run({"user_message": "My app keeps crashing on launch"})
print("path:         ", " → ".join(state.trace))
print("reply:        ", state.output_data["agent_response"][:70], "…")
print("invocation_id:", state.invocation_id)
print("token_usage:  ", state.token_usage)
2026-06-18 14:37:20.143 | INFO     | kavalai.workflow.engine:run:348 - [dec6fb74] Starting workflow 'Support router (code)'
2026-06-18 14:37:23.698 | INFO     | kavalai.workflow.engine:_finish:460 - [dec6fb74] Workflow 'Support router (code)' completed (session=None)
2026-06-18 14:37:23.699 | INFO     | kavalai.workflow.engine:_log_token_usage:400 - [dec6fb74] Workflow 'Support router (code)' token usage: 2 model call(s), 200 tokens (prompt=145, completion=55)
path:          start → classify → route → tech_reply → end
reply:         Check if your app has the latest updates installed. Go to the app stor …
invocation_id: dec6fb74
token_usage:   {'model_calls': 2, 'prompt_tokens': 145, 'completion_tokens': 55, 'total_tokens': 200}

Recap

  • A workflow is a DAG of typed nodes authored in YAML; the caller talks to the start/end boundary.

  • llm, agent, and function nodes do the work; if/switch route on safe string expressions.

  • Every run produces a JSON-serialisable WorkflowState, checkpointed and logged through pluggable backends.

  • Inject a client_factory for fast, deterministic tests.

  • Build graphs from YAML or the WorkflowBuilder; every run reports a unique invocation_id and aggregate token_usage.

A complete, runnable graph lives at examples/v2_workflow_support_agent.yaml.