Streaming results with Streamer¶
Streamer (from kavalai import Streamer) is Kaval.AI’s async streaming
primitive. You obtain one or more value streamers from it (each with a
name), push partial chunks as they arrive, and mark each stream complete.
The Streamer itself is an async iterator that yields StreamContent messages
(type, name, value) until every active value streamer has completed.
This notebook walks through the four common patterns. Cells use top-level
await (supported in Jupyter), so there’s no asyncio.run(...) wrapper.
from kavalai import Streamer
1. Basic streaming¶
Push partial chunks, then complete. By default each partial message carries the
cumulative value so far, and complete carries the final value.
streamer = Streamer()
result = streamer.get_value_streamer("result")
await result.stream_partial("Hello,")
await result.stream_partial(" world!")
await result.stream_complete() # marks the "result" stream complete
async for message in streamer:
print(message.model_dump_json())
Expected output:
{"type":"partial","name":"result","value":"Hello,"}
{"type":"partial","name":"result","value":"Hello, world!"}
{"type":"complete","name":"result","value":"Hello, world!"}
2. Producing from a background task¶
The producer and the consumer can run concurrently: start the producing coroutine as a task and iterate the streamer to drain messages as they arrive.
import asyncio
streamer = Streamer()
result = streamer.get_value_streamer("result")
async def produce():
await result.stream_partial("Hello,")
await result.stream_partial(" world!")
await result.stream_complete()
task = asyncio.create_task(produce())
async for message in streamer:
print(message.model_dump_json())
await task # ensure the producer finished cleanly
3. Delta mode¶
With stream_delta=True, each partial carries only the new delta (not the
cumulative value), and complete carries no value. Use this when the consumer
appends deltas itself.
streamer = Streamer(stream_delta=True)
result = streamer.get_value_streamer("result")
await result.stream_partial("Hello,")
await result.stream_partial(" world!")
await result.stream_complete()
async for message in streamer:
print(message.model_dump_json())
Expected output (note the per-chunk deltas and the null final value):
{"type":"partial","name":"result","value":"Hello,"}
{"type":"partial","name":"result","value":" world!"}
{"type":"complete","name":"result","value":null}
4. Streaming a structured (Pydantic) value¶
Pass a response_model and the value streamer safely parses the partial JSON
as it accumulates, so each partial is the best-effort JSON parsed so far — handy
for progressively rendering a structured result while the model is still emitting
it.
from pydantic import BaseModel
class PersonRecord(BaseModel):
name: str
birth_year: int
streamer = Streamer()
result = streamer.get_value_streamer("result", response_model=PersonRecord)
await result.stream_partial('{"name": "Ti')
await result.stream_partial('mo", ')
await result.stream_partial(' "birth_year": 1986}')
await result.stream_complete()
async for message in streamer:
print("partial JSON:", message.value)
The final complete message holds the fully-formed JSON
{"name": "Timo", "birth_year": 1986}, which you can validate with
PersonRecord.model_validate_json(message.value).