Streaming results with Streamer

Streamer (from kavalai import Streamer) is Kaval.AI’s async streaming primitive. You obtain one or more value streamers from it (each with a name), push partial chunks as they arrive, and mark each stream complete. The Streamer itself is an async iterator that yields StreamContent messages (type, name, value) until every active value streamer has completed.

This notebook walks through the four common patterns. Cells use top-level await (supported in Jupyter), so there’s no asyncio.run(...) wrapper.

from kavalai import Streamer

1. Basic streaming

Push partial chunks, then complete. By default each partial message carries the cumulative value so far, and complete carries the final value.

streamer = Streamer()
result = streamer.get_value_streamer("result")

await result.stream_partial("Hello,")
await result.stream_partial(" world!")
await result.stream_complete()  # marks the "result" stream complete

async for message in streamer:
    print(message.model_dump_json())

Expected output:

{"type":"partial","name":"result","value":"Hello,"}
{"type":"partial","name":"result","value":"Hello, world!"}
{"type":"complete","name":"result","value":"Hello, world!"}

2. Producing from a background task

The producer and the consumer can run concurrently: start the producing coroutine as a task and iterate the streamer to drain messages as they arrive.

import asyncio

streamer = Streamer()
result = streamer.get_value_streamer("result")


async def produce():
    await result.stream_partial("Hello,")
    await result.stream_partial(" world!")
    await result.stream_complete()


task = asyncio.create_task(produce())

async for message in streamer:
    print(message.model_dump_json())

await task  # ensure the producer finished cleanly

3. Delta mode

With stream_delta=True, each partial carries only the new delta (not the cumulative value), and complete carries no value. Use this when the consumer appends deltas itself.

streamer = Streamer(stream_delta=True)
result = streamer.get_value_streamer("result")

await result.stream_partial("Hello,")
await result.stream_partial(" world!")
await result.stream_complete()

async for message in streamer:
    print(message.model_dump_json())

Expected output (note the per-chunk deltas and the null final value):

{"type":"partial","name":"result","value":"Hello,"}
{"type":"partial","name":"result","value":" world!"}
{"type":"complete","name":"result","value":null}

4. Streaming a structured (Pydantic) value

Pass a response_model and the value streamer safely parses the partial JSON as it accumulates, so each partial is the best-effort JSON parsed so far — handy for progressively rendering a structured result while the model is still emitting it.

from pydantic import BaseModel


class PersonRecord(BaseModel):
    name: str
    birth_year: int


streamer = Streamer()
result = streamer.get_value_streamer("result", response_model=PersonRecord)

await result.stream_partial('{"name": "Ti')
await result.stream_partial('mo", ')
await result.stream_partial(' "birth_year": 1986}')
await result.stream_complete()

async for message in streamer:
    print("partial JSON:", message.value)

The final complete message holds the fully-formed JSON {"name": "Timo", "birth_year": 1986}, which you can validate with PersonRecord.model_validate_json(message.value).