Source code for kavalai.agents.client

"""
Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import urllib.parse
from typing import Optional, Type

import httpx
from json_schema_to_pydantic import create_model
from pydantic import BaseModel

from kavalai.tools.openapi_spec_parser import OpenApiSpecParser


[docs] class AgentClient: """Async HTTP client for invoking a remote Kaval.AI agent server. Wraps the agent server's ``/run_agent`` and ``/stream_agent`` endpoints, discovering the agent's input/output schemas from its OpenAPI spec and transparently maintaining the conversation ``session_id`` across calls so successive invocations share the same session. Optional HTTP Basic Auth is used when both ``username`` and ``password`` are provided. Args: base_url: Base URL of the agent server. username: Optional HTTP Basic Auth username. password: Optional HTTP Basic Auth password. timeout: Per-request timeout in seconds. """ def __init__( self, base_url: str, username: Optional[str] = None, password: Optional[str] = None, timeout: float = 60.0, ): self.base_url = base_url.rstrip("/") self.auth = (username, password) if username and password else None self.timeout = timeout self.session_id: Optional[str] = None self.input_schema: Optional[Type[BaseModel]] = None self.output_schema: Optional[Type[BaseModel]] = None
[docs] async def discover_schemas(self): """Fetch the server's OpenAPI spec and derive the agent's schemas. Populates ``self.input_schema`` and ``self.output_schema`` with the Pydantic models for the agent's request and response payloads. Called automatically by :meth:`run_agent` and :meth:`stream_agent` on first use, but may be invoked directly to inspect the schemas up front. """ async with httpx.AsyncClient(auth=self.auth, timeout=self.timeout) as client: openapi_spec_url = urllib.parse.urljoin(self.base_url + "/", "openapi.json") resp = await client.get(openapi_spec_url) resp.raise_for_status() spec = resp.json() parser = OpenApiSpecParser(spec) self.input_schema = ( create_model(parser.get_path_request_schema("/run_agent", "POST")) .model_fields["data"] .annotation ) self.output_schema = ( create_model(parser.get_path_response_schema("/run_agent", "POST")) .model_fields["data"] .annotation )
[docs] async def run_agent( self, data: BaseModel, external_id: Optional[str] = None ) -> BaseModel: """Run the agent once and return its complete response. Sends ``data`` to the server's ``/run_agent`` endpoint, blocking until the run finishes. Updates ``self.session_id`` from the response so the next call continues the same conversation. Args: data: The request payload (an instance matching the agent's input schema). external_id: Optional caller-side identifier to correlate the session with an external system. Returns: An instance of the agent's output schema with the run's result. """ if self.input_schema is None or self.output_schema is None: await self.discover_schemas() payload = { "session_id": self.session_id, "external_id": external_id, "data": data.model_dump(), } url = f"{self.base_url}/run_agent" async with httpx.AsyncClient(auth=self.auth, timeout=self.timeout) as client: resp = await client.post(url, json=payload) resp.raise_for_status() response_json = resp.json() self.session_id = response_json.get("session_id") return self.output_schema(**response_json["data"])
[docs] async def stream_agent(self, data: BaseModel, external_id: Optional[str] = None): """Run the agent and stream its output incrementally. Sends ``data`` to the server's ``/stream_agent`` (Server-Sent Events) endpoint and yields each ``data:`` chunk as a string as it arrives, letting callers consume partial output before the run completes. Args: data: The request payload (an instance matching the agent's input schema). external_id: Optional caller-side identifier to correlate the session with an external system. Yields: str: Successive content chunks from the streamed response. """ if self.input_schema is None or self.output_schema is None: await self.discover_schemas() payload = { "session_id": self.session_id, "external_id": external_id, "data": data.model_dump(), } url = f"{self.base_url}/stream_agent" async with httpx.AsyncClient(auth=self.auth, timeout=self.timeout) as client: async with client.stream("POST", url, json=payload) as response: response.raise_for_status() async for line in response.aiter_lines(): async for chunk in self._process_stream_line(line): yield chunk
async def _process_stream_line(self, line: str): if line.startswith("data: "): yield line[6:]