Agents API

The agent runtime lives in kavalai.agents. The headline class is Agent — a multi-step reasoning loop that calls tools through a FunctionKernel until it produces a final, optionally structured, answer.

Agent

class kavalai.agents.agent.ToolCall(*, name: str, literal_args: str = '{}', planner_context_args: str = '{}', input_args: str = '{}', call_id: str | None = None)[source]

Bases: BaseModel

This data structure represents tool call requests.

Arguments are expected to be JSON encoded to help LLM models encode the data.

model_config : ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name : str
literal_args : str
planner_context_args : str
input_args : str
call_id : str | None
kavalai.agents.agent.get_step_output_type(ResponseModel=typing.Type[pydantic.main.BaseModel])[source]
class kavalai.agents.agent.Agent(llm_client: BaseLlmClient, *, kernel: FunctionKernel | None = None, run_context: RunContext | None = None, prompt_template: Template | None = None, debug: bool = False)[source]

Bases: object

async prompt(prompt: str, response_model: type[BaseModel] | None = None, max_steps: int = 10) str | BaseModel[source]

Run the agent loop, calling tools until it produces a final output.

The agent iterates up to max_steps times. On each step the LLM returns a StepOutput with optional tool_calls and an optional final output. Tool calls are executed through the FunctionKernel and their results are fed back into the prompt so the model can reason over them on the next step. The loop stops once the model returns an output without requesting further tool calls, or when max_steps is reached.

Parameters:
prompt: str

The task description for the agent.

response_model: type[BaseModel] | None = None

Optional Pydantic model describing the structured final output. When omitted, a plain string is returned.

max_steps: int = 10

Maximum number of reasoning/tool-calling iterations.

Returns:

The structured response_model instance, or a string when no response_model is provided. None if no output was produced.

Run context

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.agents.run_context.RunContext(*, agent_id: UUID | None = None, session_id: UUID | None = None, run_id: UUID | None = None, data: dict = {}, templates: dict[str, str] = {}, agent_service: Any | None = None)[source]

Bases: BaseModel

Runtime data for a single interaction.

model_config : ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

agent_id : UUID | None
session_id : UUID | None
run_id : UUID | None
data : dict
templates : Dict[str, str]
agent_service : Any | None
resolve_context_value(path: str)[source]

Resolve a dotted path like ‘input.user_message’ from context data.

async resolve_history_value(path: str)[source]

Resolve a value from session history.

async resolve_template_value(name: str)[source]

Resolve a template value by name.

async render_prompt(prompt: str) str[source]

Render a prompt string by replacing {{ templates.NAME }}, {{ context.PATH }}, and {{ history.PATH }} with their resolved values.

async resolve_input_info(info: ArgumentInfo)[source]

Resolve a TypeInputInfo to its actual value.

async prepare_tool_inputs(task: Any) dict[source]

Resolve a task/node’s inputs mapping into plain values.

Workflow configuration models

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Shared workflow building blocks reused across the codebase (input wiring, server/tool declarations and the workflow exception). The v2 workflow engine lives in kavalai.workflow.

exception kavalai.agents.workflow_model.WorkflowException[source]

Bases: Exception

Base exception for errors building, validating or running a workflow.

class kavalai.agents.workflow_model.ArgumentInfo(*, type: 'literal' | 'context' | 'history', value: BaseModel | str | int | float | bool | None = None, name: str | None = None)[source]

Bases: BaseModel

Describes input arguments in workflow YAML files.

The ‘type’ field describes where the input argument should be retrieved from. ‘literal’ - use value as specified ‘context’ - retrieve from agent run context ‘history’ - retrieve from previous agent run contexts.

type : Literal['literal', 'context', 'history']
value : BaseModel | str | int | float | bool | None
name : str | None
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.agents.workflow_model.RestServer(*, name: str, url: str | None = None, url_env: str | None = None, username_env: str | None = None, password_env: str | None = None)[source]

Bases: BaseModel

Defines a REST server.

We also support HTTP Basic Auth for REST server endpoints, which are defined via environment variables username_env and password_env.

Note that url_env can also be read from the env file.

name : str
url : str | None
url_env : str | None
username_env : str | None
password_env : str | None
check_url_configs() RestServer[source]
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.agents.workflow_model.McpServer(*, name: str, command: str | None = None, command_env: str | None = None, args: list[str] = [], env: dict[str, str] = {}, url: str | None = None, url_env: str | None = None)[source]

Bases: BaseModel

Defines an MCP server.

name : str
command : str | None
command_env : str | None
args : list[str]
env : dict[str, str]
url : str | None
url_env : str | None
check_configs() McpServer[source]
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.agents.workflow_model.PythonFunction(*, name: str, path: str)[source]

Bases: BaseModel

Declares a Python tool available to a workflow.

Variables:
name : str

Name the tool is registered and addressed under (python://<name>).

path : str

Import path to the @kavalai.pythontool decorated function, e.g. my_package.my_module.my_func.

name : str
path : str
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.agents.workflow_model.TemplateModel(*, name: str, value: str)[source]

Bases: BaseModel

A named, reusable text template referenced within a workflow.

Variables:
name : str

Identifier the template is referenced by.

value : str

The template text (e.g. a prompt) to interpolate at run time.

name : str
value : str
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Agent service & persistence

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.agents.agent_service.AgentService(session_maker: async_sessionmaker[AsyncSession])[source]

Bases: object

Provider common database operation for Agents.

async get_or_create_agent(name: str, description: str | None = None, input_schema: dict | None = None, output_schema: dict | None = None, workflow: dict | None = None) Agent[source]

Finds an agent by name or creates a new one if not found.

async get_or_create_session(agent_id: UUID, session_id: UUID | None = None, external_id: UUID | None = None) Session | None[source]
async create_run(session_id: UUID, input_data: dict | None = None, context: dict | None = None) Run[source]

Creates a new run entry for a specific session.

async initialize_workflow_run(agent_name: str, agent_description: str | None = None, input_schema: dict | None = None, output_schema: dict | None = None, workflow: dict | None = None, session_id: UUID | None = None, external_id: str | None = None, input_data: dict | None = None) tuple[Agent, Session, Run][source]

Initialize agent, session, and run in a single database transaction.

This is an optimized batch operation that reduces 3 DB roundtrips to 1, improving performance especially for remote databases.

Returns:

tuple of (agent, session, run)

async update_run(run_id: UUID, *, output_data: dict | None = None, context: dict | None = None) Run[source]

Updates an existing run with final output_data and/or context.

async add_task(session_id: UUID, run_id: UUID, name: str | None = None, agent_id: UUID | None = None, inputs: dict | None = None, output: dict | None = None, prompt: str | None = None, errors: list[str] | None = None, duration_seconds: float | None = None, node_type: str | None = None) Task[source]

Records a specific unit of work (Task) performed within a run.

async add_model_call_stats(stats: ModelCallStat, agent_id: UUID | None = None) ModelCallStat[source]

Records LLM/Embedding call statistics.

async get_history_value(session_id: UUID, key: str) Any | None[source]

Retrieves a value from the context of previous runs in the same session.

  • If key is a dotted path (e.g., “output.search_results”), resolves it as such.

  • If key is a plain name (e.g., “search_results”), searches recursively for the first matching key in the context dicts of previous runs (newest first).

Returns the most recent value found for the given key.

async get_chat_history(session_id: UUID, limit: int = 50) list[ChatMessage][source]

Retrieves the conversation history for a session, ordered from oldest to newest.

async add_chat_message(agent_id: UUID, session_id: UUID, role: str, content: str | None, run_id: UUID | None = None) ChatMessage[source]

Helper to append messages to the chat history.

async get_model_call_stats(call_type: str | None = None, limit: int = 50, offset: int = 0) list[ModelCallStat][source]

Retrieves paginated model call stats, optionally filtered by call type.

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.agents.sessions.SessionSummary(*, session_id: UUID, agent_id: UUID, agent_name: str, runs_count: int, tasks_count: int, messages_count: int, first_message: str | None, last_message: str | None, errors_count: int, created_at: datetime, updated_at: datetime)[source]

Bases: BaseModel

Row-level summary of a session for the Conversations list.

Aggregates a session’s owning agent, its run/task/message and error counts, and a preview of its first and last messages.

session_id : UUID
agent_id : UUID
agent_name : str
runs_count : int
tasks_count : int
messages_count : int
first_message : str | None
last_message : str | None
errors_count : int
created_at : datetime
updated_at : datetime
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.agents.sessions.TaskSummary(*, id: UUID, agent_id: UUID | None, session_id: UUID, run_id: UUID, inputs: Any | None, output: Any | None, name: str | None = None, prompt: str | None = None, errors: list[str] | None = None, duration_seconds: float | None = None, created_at: datetime, updated_at: datetime)[source]

Bases: BaseModel

Summary of a single task (workflow-node execution) for the Tasks view.

Exposes the task’s inputs, output, name, prompt, any errors and duration.

model_config : ClassVar[ConfigDict] = {'from_attributes': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

id : UUID
agent_id : UUID | None
session_id : UUID
run_id : UUID
inputs : Any | None
output : Any | None
name : str | None
prompt : str | None
errors : list[str] | None
duration_seconds : float | None
created_at : datetime
updated_at : datetime
class kavalai.agents.sessions.RunSummary(*, id: UUID, session_id: UUID, input_data: Any | None, output_data: Any | None, context: Any | None, tasks_count: int, created_at: datetime, updated_at: datetime)[source]

Bases: BaseModel

Summary of a single workflow run for the Runs view.

Exposes the run’s input/output data, resolved context and the number of tasks it executed.

model_config : ClassVar[ConfigDict] = {'from_attributes': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

id : UUID
session_id : UUID
input_data : Any | None
output_data : Any | None
context : Any | None
tasks_count : int
created_at : datetime
updated_at : datetime
class kavalai.agents.sessions.ChatMessageSummary(*, id: UUID, agent_id: UUID, session_id: UUID, run_id: UUID | None, role: str, content: str, created_at: datetime, updated_at: datetime)[source]

Bases: BaseModel

Summary of a single chat message for the conversation transcript.

Exposes the message’s role, content and the run it is associated with.

model_config : ClassVar[ConfigDict] = {'from_attributes': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

id : UUID
agent_id : UUID
session_id : UUID
run_id : UUID | None
role : str
content : str
created_at : datetime
updated_at : datetime
class kavalai.agents.sessions.SessionDetails(*, session_id: UUID, messages: list[ChatMessageSummary], runs: list[RunSummary], tasks: list[TaskSummary])[source]

Bases: BaseModel

Full detail of one session: its messages, runs and tasks.

Powers the per-conversation detail view in the backoffice, bundling the session’s chat transcript together with all of its runs and tasks.

session_id : UUID
messages : list[ChatMessageSummary]
runs : list[RunSummary]
tasks : list[TaskSummary]
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.agents.sessions.SessionsResponse[source]

Bases: TypedDict

sessions : list[SessionSummary]
total_count : int
async kavalai.agents.sessions.get_sessions_summary(session: AsyncSession, agent_id: UUID | None = None, search: str | None = None, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 50, offset: int = 0) SessionsResponse[source]
async kavalai.agents.sessions.get_session_details(session: AsyncSession, session_id: UUID) SessionDetails[source]

Remote agent client

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.agents.client.AgentClient(base_url: str, username: str | None = None, password: str | None = None, timeout: float = 60.0)[source]

Bases: object

Async HTTP client for invoking a remote Kaval.AI agent server.

Wraps the agent server’s /run_agent and /stream_agent endpoints, discovering the agent’s input/output schemas from its OpenAPI spec and transparently maintaining the conversation session_id across calls so successive invocations share the same session. Optional HTTP Basic Auth is used when both username and password are provided.

Parameters:
base_url: str

Base URL of the agent server.

username: str | None = None

Optional HTTP Basic Auth username.

password: str | None = None

Optional HTTP Basic Auth password.

timeout: float = 60.0

Per-request timeout in seconds.

async discover_schemas()[source]

Fetch the server’s OpenAPI spec and derive the agent’s schemas.

Populates self.input_schema and self.output_schema with the Pydantic models for the agent’s request and response payloads. Called automatically by run_agent() and stream_agent() on first use, but may be invoked directly to inspect the schemas up front.

async run_agent(data: BaseModel, external_id: str | None = None) BaseModel[source]

Run the agent once and return its complete response.

Sends data to the server’s /run_agent endpoint, blocking until the run finishes. Updates self.session_id from the response so the next call continues the same conversation.

Parameters:
data: BaseModel

The request payload (an instance matching the agent’s input schema).

external_id: str | None = None

Optional caller-side identifier to correlate the session with an external system.

Returns:

An instance of the agent’s output schema with the run’s result.

async stream_agent(data: BaseModel, external_id: str | None = None)[source]

Run the agent and stream its output incrementally.

Sends data to the server’s /stream_agent (Server-Sent Events) endpoint and yields each data: chunk as a string as it arrives, letting callers consume partial output before the run completes.

Parameters:
data: BaseModel

The request payload (an instance matching the agent’s input schema).

external_id: str | None = None

Optional caller-side identifier to correlate the session with an external system.

Yields:

str – Successive content chunks from the streamed response.

RAG service

Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

class kavalai.agents.rag_service.RagServiceResult(*, id: UUID, model: str, collection_name: str, source_id: str, content: str | None = None, embedding_size: int, rag_metadata: dict, similarity: float, created_at: datetime | None = None, updated_at: datetime | None = None, query_index: int | None = None)[source]

Bases: BaseModel

Represents a single result from a RAG query.

Variables:
id : UUID

Unique identifier of the indexed item.

model : str

The embedding model used for this item.

collection_name : str

The name of the collection this item belongs to.

source_id : str

An external identifier for the source of this item.

content : Optional[str]

The original text content that was indexed.

embedding_size : int

The dimension of the embedding vector.

rag_metadata : dict

Additional metadata associated with the item.

similarity : float

The similarity score (1.0 - distance) relative to the query.

created_at : Optional[datetime]

Timestamp when the item was created.

updated_at : Optional[datetime]

Timestamp when the item was last updated.

query_index : Optional[int]

Index of the query in batch queries (for batch_query results).

id : UUID
model : str
collection_name : str
source_id : str
content : str | None
embedding_size : int
rag_metadata : dict
similarity : float
created_at : datetime | None
updated_at : datetime | None
query_index : int | None
model_config : ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class kavalai.agents.rag_service.RagService(session_maker: async_sessionmaker[AsyncSession] | Callable[[], AbstractAsyncContextManager[AsyncSession]], model: str, agent: Agent | None = None, normalizer: Normalizer | None = None)[source]

Bases: object

Service for indexing and querying text using embeddings (Retrieval-Augmented Generation).

This service provides methods to batch index text, delete items, query similarities, and compute similarity matrices against indexed content.

classmethod from_uri(uri: str, model: str, agent: Agent | None = None, normalizer: Normalizer | None = None) RagService[source]

Create a RagService from a database URI.

Parameters:
uri : str

Database URI.

model : str

The name of the embedding model to use.

agent : Optional[Agent]

Optional Agent object to associate with this service.

normalizer : Optional[Normalizer]

Optional normalizer to use for embeddings.

Returns:

A new instance of RagService.

Return type:

RagService

classmethod from_session_maker(session_maker: async_sessionmaker[AsyncSession], model: str, agent: Agent | None = None, normalizer: Normalizer | None = None) RagService[source]

Create a RagService from a session maker.

Parameters:
session_maker : async_sessionmaker[AsyncSession]

Async session maker for the database.

model : str

The name of the embedding model to use.

agent : Optional[Agent]

Optional Agent object to associate with this service.

normalizer : Optional[Normalizer]

Optional normalizer to use for embeddings.

Returns:

A new instance of RagService.

Return type:

RagService

async batch_index(*, texts: list[str], metadata_list: list[dict], source_ids: list[str] | None = None, collection_name: str = 'default') list[RagIndex][source]

Index multiple text items in a single batch.

Parameters:
texts : list[str]

List of text strings to index.

metadata_list : list[dict]

List of metadata dictionaries for each text.

source_ids : Optional[list[str]]

Optional list of source identifiers. If not provided, “default” is used.

collection_name : str

Name of the collection to add items to. Defaults to “default”.

Returns:

List of created RagIndex database objects.

Return type:

list[RagIndex]

Raises:

ValueError – If the lengths of texts, metadata_list, or source_ids do not match.

async delete_by_source_ids(collection_name: str, source_ids: list[str])[source]

Delete items from a collection by their source identifiers.

Parameters:
collection_name : str

The name of the collection.

source_ids : list[str]

List of source identifiers to delete.

async index(text: str, source_metadata: dict | None = None, collection_name: str = 'default', source_id: str = 'default')[source]

Index a single text blob with metadata.

Parameters:
text : str

The text content to index.

source_metadata : Optional[dict]

Metadata to associate with the text.

collection_name : str

Name of the collection. Defaults to “default”.

source_id : str

Source identifier. Defaults to “default”.

Returns:

The created RagIndex database object.

Return type:

RagIndex

async query(text: str, top_k: int = 5, collection_name: str | None = None, source_ids: list[str] | None = None, keep_best: bool = False) list[RagServiceResult][source]

Query the indexed items for similarities to the input text.

Parameters:
text : str

The query text.

top_k : int

Number of top results to return. Defaults to 5.

collection_name : Optional[str]

If provided, filter by collection name.

source_ids : Optional[list[str]]

If provided, filter by source identifiers.

keep_best : bool

If True, only the best result per source_id is returned. Useful when a single source is split into multiple indexed items.

Returns:

List of results with similarity scores.

Return type:

list[RagServiceResult]

async batch_query(texts: list[str], top_k: int = 5, collection_name: str | None = None, source_ids: list[str] | None = None) list[list[RagServiceResult]][source]

Query the indexed items for similarities to multiple input texts in a single database call.

This method uses PostgreSQL CROSS JOIN LATERAL to efficiently process multiple queries in a single round trip to the database, significantly improving performance for batch operations.

Parameters:
texts : list[str]

List of query texts to search for.

top_k : int

Number of top results to return per query. Defaults to 5.

collection_name : Optional[str]

If provided, filter by collection name.

source_ids : Optional[list[str]]

If provided, filter by source identifiers.

Returns:

A list of result lists, where each inner list contains

the top_k results for the corresponding query text.

Return type:

list[list[RagServiceResult]]

async compute_similarity_matrix(texts: list[str], source_ids: list[str], method: str = 'min') list[list[float]][source]

Compute a similarity matrix between multiple texts and multiple source identifiers.

This method generates embeddings for all input texts and performs a single database query to find similarities against all specified source_ids.

Parameters:
texts : list[str]

List of query texts (rows in the matrix).

source_ids : list[str]

List of source identifiers to compare against (columns in the matrix).

method : str

Aggregate method to use when multiple items exist for a source_id. “min” (default) uses the shortest distance (highest similarity). “avg” uses the average distance.

Returns:

A 2D matrix where matrix[i][j] is the similarity between

texts[i] and source_ids[j].

Return type:

list[list[float]]

async learn_normalizer(collection_name: str | None = None) Normalizer[source]

Learns a normalizer from the current RAG index.