"""
Copyright 2026 OÜ KAVAL AI (registry code 17393877)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict
from sqlalchemy import func, select, desc, asc, case
from sqlalchemy.ext.asyncio import AsyncSession
from kavalai.agents.db import Session, Run, Task, ChatMessage, Agent
from typing import TypedDict, Any
[docs]
class SessionSummary(BaseModel):
"""Row-level summary of a session for the Conversations list.
Aggregates a session's owning agent, its run/task/message and error counts,
and a preview of its first and last messages.
"""
session_id: UUID
agent_id: UUID
agent_name: str
runs_count: int
tasks_count: int
messages_count: int
first_message: str | None
last_message: str | None
errors_count: int
created_at: datetime
updated_at: datetime
[docs]
class TaskSummary(BaseModel):
"""Summary of a single task (workflow-node execution) for the Tasks view.
Exposes the task's inputs, output, name, prompt, any errors and duration.
"""
model_config = ConfigDict(from_attributes=True)
id: UUID
agent_id: UUID | None
session_id: UUID
run_id: UUID
inputs: Any | None
output: Any | None
name: str | None = None
prompt: str | None = None
errors: list[str] | None = None
duration_seconds: float | None = None
created_at: datetime
updated_at: datetime
[docs]
class RunSummary(BaseModel):
"""Summary of a single workflow run for the Runs view.
Exposes the run's input/output data, resolved context and the number of
tasks it executed.
"""
model_config = ConfigDict(from_attributes=True)
id: UUID
session_id: UUID
input_data: Any | None
output_data: Any | None
context: Any | None
tasks_count: int
created_at: datetime
updated_at: datetime
[docs]
class ChatMessageSummary(BaseModel):
"""Summary of a single chat message for the conversation transcript.
Exposes the message's role, content and the run it is associated with.
"""
model_config = ConfigDict(from_attributes=True)
id: UUID
agent_id: UUID
session_id: UUID
run_id: UUID | None
role: str
content: str
created_at: datetime
updated_at: datetime
[docs]
class SessionDetails(BaseModel):
"""Full detail of one session: its messages, runs and tasks.
Powers the per-conversation detail view in the backoffice, bundling the
session's chat transcript together with all of its runs and tasks.
"""
session_id: UUID
messages: list[ChatMessageSummary]
runs: list[RunSummary]
tasks: list[TaskSummary]
[docs]
class SessionsResponse(TypedDict):
sessions: list[SessionSummary]
total_count: int
[docs]
async def get_sessions_summary(
session: AsyncSession,
agent_id: UUID | None = None,
search: str | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
limit: int = 50,
offset: int = 0,
) -> SessionsResponse:
# Get total count first
# We need to apply filters to count as well
# Subquery to identify sessions that match the search criteria in messages
session_filter_stmt = select(Session.id)
if agent_id:
session_filter_stmt = session_filter_stmt.where(Session.agent_id == agent_id)
if start_date:
session_filter_stmt = session_filter_stmt.where(
Session.created_at >= start_date
)
if end_date:
session_filter_stmt = session_filter_stmt.where(Session.created_at <= end_date)
if search:
# Search in ChatMessage content
search_subq = (
select(ChatMessage.session_id)
.where(ChatMessage.content.ilike(f"%{search}%"))
.distinct()
.subquery()
)
session_filter_stmt = session_filter_stmt.where(
Session.id.in_(select(search_subq.c.session_id))
)
count_stmt = select(func.count()).select_from(session_filter_stmt.subquery())
total_count_res = await session.execute(count_stmt)
total_count = total_count_res.scalar() or 0
# Subquery for counts
runs_count_sub = (
select(Run.session_id, func.count(Run.id).label("count"))
.group_by(Run.session_id)
.subquery()
)
tasks_count_sub = (
select(Task.session_id, func.count(Task.id).label("count"))
.group_by(Task.session_id)
.subquery()
)
messages_count_sub = (
select(ChatMessage.session_id, func.count(ChatMessage.id).label("count"))
.group_by(ChatMessage.session_id)
.subquery()
)
errors_count_sub = (
select(Task.session_id, func.count(Task.id).label("count"))
.where(Task.errors.is_not(None))
.where(
case(
(
func.jsonb_typeof(Task.errors) == "array",
func.jsonb_array_length(Task.errors) > 0,
),
else_=func.jsonb_typeof(Task.errors) != "null",
)
)
.group_by(Task.session_id)
.subquery()
)
# Subqueries for first and last messages
# Using window functions might be more efficient but let's try a common approach
# Or just fetch them in the main query if possible.
# Actually, let's use a more direct approach for first/last messages to keep it simple and readable.
# We'll use lateral joins or window functions if supported,
# but for compatibility let's try separate subqueries or a combined one.
stmt = (
select(
Session.id.label("session_id"),
Session.agent_id,
Agent.name.label("agent_name"),
func.coalesce(runs_count_sub.c.count, 0).label("runs_count"),
func.coalesce(tasks_count_sub.c.count, 0).label("tasks_count"),
func.coalesce(messages_count_sub.c.count, 0).label("messages_count"),
func.coalesce(errors_count_sub.c.count, 0).label("errors_count"),
Session.created_at,
Session.updated_at,
)
.join(Agent, Session.agent_id == Agent.id)
.outerjoin(runs_count_sub, Session.id == runs_count_sub.c.session_id)
.outerjoin(tasks_count_sub, Session.id == tasks_count_sub.c.session_id)
.outerjoin(messages_count_sub, Session.id == messages_count_sub.c.session_id)
.outerjoin(errors_count_sub, Session.id == errors_count_sub.c.session_id)
)
if agent_id:
stmt = stmt.where(Session.agent_id == agent_id)
if start_date:
stmt = stmt.where(Session.created_at >= start_date)
if end_date:
stmt = stmt.where(Session.created_at <= end_date)
if search:
# We already have search_subq defined earlier
stmt = stmt.where(Session.id.in_(select(search_subq.c.session_id)))
stmt = stmt.order_by(desc(Session.updated_at)).limit(limit).offset(offset)
result = await session.execute(stmt)
sessions_data = result.all()
summaries = []
for row in sessions_data:
# Fetch first and last message for each session
# This is N+1 but for a limited list it might be okay for now.
# Alternatively we can do it in one big query with window functions.
first_msg_stmt = (
select(ChatMessage.content)
.where(ChatMessage.session_id == row.session_id)
.order_by(ChatMessage.created_at.asc())
.limit(1)
)
last_msg_stmt = (
select(ChatMessage.content)
.where(ChatMessage.session_id == row.session_id)
.order_by(ChatMessage.created_at.desc())
.limit(1)
)
first_msg = (await session.execute(first_msg_stmt)).scalar_one_or_none()
last_msg = (await session.execute(last_msg_stmt)).scalar_one_or_none()
summaries.append(
SessionSummary(
session_id=row.session_id,
agent_id=row.agent_id,
agent_name=row.agent_name,
runs_count=row.runs_count,
tasks_count=row.tasks_count,
messages_count=row.messages_count,
errors_count=row.errors_count,
first_message=first_msg,
last_message=last_msg,
created_at=row.created_at,
updated_at=row.updated_at,
)
)
return {"sessions": summaries, "total_count": total_count}
[docs]
async def get_session_details(
session: AsyncSession,
session_id: UUID,
) -> SessionDetails:
# Fetch messages
msg_stmt = (
select(ChatMessage)
.where(ChatMessage.session_id == session_id)
.order_by(asc(ChatMessage.created_at))
)
msg_result = await session.execute(msg_stmt)
messages = [
ChatMessageSummary.model_validate(m) for m in msg_result.scalars().all()
]
# Fetch tasks count per run
tasks_count_sub = (
select(Task.run_id, func.count(Task.id).label("count"))
.where(Task.session_id == session_id)
.group_by(Task.run_id)
.subquery()
)
# Fetch runs
run_stmt = (
select(
Run.id,
Run.session_id,
Run.input_data,
Run.output_data,
Run.context,
func.coalesce(tasks_count_sub.c.count, 0).label("tasks_count"),
Run.created_at,
Run.updated_at,
)
.outerjoin(tasks_count_sub, Run.id == tasks_count_sub.c.run_id)
.where(Run.session_id == session_id)
.order_by(asc(Run.created_at))
)
run_result = await session.execute(run_stmt)
runs = [RunSummary.model_validate(r) for r in run_result.all()]
# Fetch tasks
task_stmt = (
select(Task).where(Task.session_id == session_id).order_by(asc(Task.created_at))
)
task_result = await session.execute(task_stmt)
tasks = [TaskSummary.model_validate(t) for t in task_result.scalars().all()]
return SessionDetails(
session_id=session_id, messages=messages, runs=runs, tasks=tasks
)