Source code for kavalai.backoffice.server

"""
Copyright 2026 OÜ KAVAL AI (registry code 17393877)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from loguru import logger
import os
from datetime import datetime
from uuid import UUID

import uvicorn
from authlib.integrations.starlette_client import OAuth
from fastapi import FastAPI, Request, HTTPException, status, Body
from sqlalchemy.exc import SQLAlchemyError
from kavalai.crud import insert, select, delete, update, get_one, get_all
from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import JSONResponse, RedirectResponse, Response
from kavalai.backoffice import db
from kavalai.backoffice.db import is_owner, is_member
from kavalai.backoffice.project_service import ProjectService
from kavalai.agents.agent_service import AgentService
from kavalai.agents.db import db_manager, Agent
from kavalai.agents import stats as agent_stats
from kavalai.agents import sessions as agent_sessions
from kavalai.agents.rag_service import RagService
from kavalai.llm_clients.common import Streamer
from kavalai.backoffice.embedding_projector import train_pca
from sse_starlette.sse import EventSourceResponse
from contextlib import asynccontextmanager

# Set up the app logger
logger.propagate = True

app = FastAPI()


[docs] @asynccontextmanager async def get_backoffice_session(): """ Context manager to provide a session for the backoffice database. Handles connection errors gracefully. """ try: async with db.AsyncBackofficeSession() as session: yield session except HTTPException: # Intentional API errors raised inside the block (e.g. a 403/404 from an # access check) must propagate unchanged, not be masked as a 503. raise except SQLAlchemyError as e: logger.error(f"Failed to connect to backoffice database: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Backoffice database is not connected. Please check your database settings.", )
[docs] @asynccontextmanager async def get_project_session(project: db.Project): """ Context manager to provide a session for a specific project's agent database. Handles connection errors gracefully. """ sessionmaker = db_manager.get_sessionmaker( user=project.db_user, password=project.db_password, host=project.db_host, port=project.db_port, db_name=project.db_name, ) try: async with sessionmaker() as session: yield session except HTTPException: # Let intentional API errors from inside the block propagate unchanged. raise except SQLAlchemyError as e: logger.error(f"Failed to connect to project database for {project.name}: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Database is not connected for project '{project.name}'. Please check your database settings.", )
# OAuth setup oauth = OAuth() # Enable forwarding for proxy/Docker from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*") oauth.register( name="google", client_id=os.getenv("GOOGLE_OAUTH_CLIENT_ID"), client_secret=os.getenv("GOOGLE_OAUTH_CLIENT_SECRET"), server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_kwargs={"scope": "openid email profile"}, ) # Add SessionMiddleware with a fixed secret key and appropriate cookie settings app.add_middleware( SessionMiddleware, secret_key=os.getenv("SESSION_SECRET_KEY", "fallback-secret-key-for-dev-only"), same_site="lax", https_only=False, # Set to True if you are using HTTPS domain=None, # Should be None for localhost/development path="/", )
[docs] async def authenticate_and_sync_user(user_info: dict): async with get_backoffice_session() as session: # Check if user exists in the database. email = user_info.get("email") stmt = select(db.User).where(db.User.email == email) result = await session.execute(stmt) user = result.scalars().first() # If not, raise exception. if not user: logger.warning(f"Unauthorized login attempt: {email}") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User not registered in the system.", ) # Update picture and name user.name = user_info.get("name", user.name) user.picture = user_info.get("picture", user.picture) await session.commit() await session.refresh(user) return user
[docs] def is_logged_in(request: Request): return request.session.get("user_info") is not None
[docs] def assert_logged_in(request: Request): if not is_logged_in(request): raise HTTPException(status_code=401, detail="Unauthorized.")
[docs] def assert_is_admin(request: Request): user_session = request.session.get("user_info") if not user_session.get("is_admin"): raise HTTPException( status_code=403, detail="Only administrators can create new projects." )
[docs] async def assert_is_owner(session: db.AsyncSession, request: Request, project_id: UUID): if not await is_owner( session, UUID(request.session.get("user_info")["id"]), project_id ): raise HTTPException( status_code=403, detail="Only administrators can create new projects." )
[docs] async def assert_is_member( session: db.AsyncSession, request: Request, project_id: UUID ): if not await is_member( session, UUID(request.session.get("user_info")["id"]), project_id ): raise HTTPException(status_code=403, detail="Must be a member of the project.")
[docs] async def get_project_and_assert_access( request: Request, project_id: UUID ) -> db.Project: """Fetch project and assert user is a member.""" async with get_backoffice_session() as session: await assert_is_member(session, request, project_id) project = await get_one(session, db.Project, project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") return project
[docs] @app.get("/login") async def login(request: Request): redirect_uri = request.url_for("google_auth_callback") return await oauth.google.authorize_redirect(request, redirect_uri)
[docs] @app.get("/logout") async def logout(request: Request): del request.session["user_info"] return JSONResponse({"logged_in": is_logged_in(request)})
[docs] @app.get("/user/get_details") async def user_details(request: Request): if not is_logged_in(request): raise HTTPException(status_code=401, detail="Unauthorized.") return request.session.get("user_info")
[docs] @app.get("/auth/google/callback") async def google_auth_callback(request: Request): try: token = await oauth.google.authorize_access_token(request) user_info = await oauth.google.userinfo(token=token) db_user = await authenticate_and_sync_user(user_info) # Store essential info in the session request.session["user_info"] = { "id": str(db_user.id), "email": db_user.email, "name": db_user.name, "picture": db_user.picture, "is_admin": db_user.is_admin, "active_project_id": str(db_user.active_project_id) if db_user.active_project_id else None, } # Clear OAuth state from session after successful login if "_google_authlib_state_" in request.session: del request.session["_google_authlib_state_"] return RedirectResponse(url=os.environ["FRONTEND_URL"]) except HTTPException as e: raise e except Exception as e: logger.error(f"Auth error: {e}") # If it's a mismatching state error, it might be due to stale session if "mismatching_state" in str(e): return RedirectResponse(url="/login") raise HTTPException(status_code=400, detail="Authentication failed.")
[docs] @app.post("/user/set_active_project/{project_id}") async def set_active_project(project_id: UUID, request: Request): assert_logged_in(request) user_id = UUID(request.session.get("user_info")["id"]) async with get_backoffice_session() as session: if not await db.is_member(session, user_id, project_id): raise HTTPException( status_code=403, detail="Must be a member of the project." ) await update(session, db.User, user_id, {"active_project_id": project_id}) # Update session user_info = request.session.get("user_info", {}) user_info["active_project_id"] = str(project_id) request.session["user_info"] = user_info return {"status": "ok", "active_project_id": project_id}
[docs] @app.post("/projects/create") async def projects_create(request: Request, data: dict = Body(...)): assert_logged_in(request) assert_is_admin(request) user_session = request.session.get("user_info") service = ProjectService(db.AsyncBackofficeSession) return await service.create_project(data, UUID(user_session["id"]))
[docs] @app.get("/projects/get/{project_id}") async def projects_get_by_id(project_id: UUID, request: Request): assert_logged_in(request) return await get_project_and_assert_access(request, project_id)
[docs] @app.get("/projects/all") async def projects_get_all(request: Request): assert_logged_in(request) user_id = UUID(request.session.get("user_info")["id"]) service = ProjectService(db.AsyncBackofficeSession) return await service.get_user_projects(user_id)
[docs] @app.put("/projects/update/{project_id}") async def projects_update(project_id: UUID, request: Request, data: dict = Body(...)): assert_logged_in(request) async with get_backoffice_session() as session: await assert_is_owner(session, request, project_id) service = ProjectService(db.AsyncBackofficeSession) updated = await service.update_project(project_id, data) if not updated: raise HTTPException(status_code=404, detail="Project not found.") return updated
[docs] @app.delete("/projects/delete/{project_id}") async def projects_delete(project_id: UUID, request: Request): assert_logged_in(request) async with get_backoffice_session() as session: await assert_is_owner(session, request, project_id) service = ProjectService(db.AsyncBackofficeSession) success = await service.delete_project(project_id) if not success: raise HTTPException(status_code=404, detail="Project not found") return {"status": "deleted"}
[docs] @app.get("/users/all") async def users_get_all(request: Request): assert_logged_in(request) assert_is_admin(request) async with get_backoffice_session() as session: return await get_all(session, db.User)
[docs] @app.post("/users/create") async def users_create(request: Request, data: dict = Body(...)): assert_logged_in(request) assert_is_admin(request) async with get_backoffice_session() as session: return await insert(session, db.User, data)
[docs] @app.put("/users/update/{user_id}") async def users_update(user_id: UUID, request: Request, data: dict = Body(...)): assert_logged_in(request) assert_is_admin(request) async with get_backoffice_session() as session: updated = await update(session, db.User, user_id, data) if not updated: raise HTTPException(status_code=404, detail="User not found.") return updated
[docs] @app.delete("/users/delete/{user_id}") async def users_delete(user_id: UUID, request: Request): assert_logged_in(request) assert_is_admin(request) if str(user_id) == request.session.get("user_info")["id"]: raise HTTPException(status_code=400, detail="You cannot delete yourself.") async with get_backoffice_session() as session: success = await delete(session, db.User, user_id) if not success: raise HTTPException(status_code=404, detail="User not found") return {"status": "deleted"}
[docs] @app.get("/agents/get/{project_id}/{agent_id}") async def agents_get_by_id(project_id: UUID, agent_id: UUID, request: Request): assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) async with get_project_session(project) as session: agent = await get_one(session, Agent, agent_id) if not agent: raise HTTPException(status_code=404, detail="Agent not found") return agent
[docs] @app.post("/workflows/render-svg") async def workflows_render_svg(request: Request, data: dict = Body(...)): """Render a workflow graph to an SVG diagram. Accepts ``{"workflow": {...}}`` (or a bare workflow dict) and returns an ``image/svg+xml`` document. The agents page uses this to show an agent's workflow as a backend-generated diagram (replacing the client-side build). """ assert_logged_in(request) from kavalai.workflow import render_workflow_svg workflow = data.get("workflow", data) try: svg = render_workflow_svg(workflow) except Exception as exc: raise HTTPException(status_code=400, detail=f"Could not render workflow: {exc}") return Response(content=svg, media_type="image/svg+xml")
[docs] @app.get("/agents/all/{project_id}") async def agents_get_all(project_id: UUID, request: Request): """Fetch all agents belonging to a specific project.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) async with get_project_session(project) as session: stmt = select(Agent) result = await session.execute(stmt) agents = result.scalars().all() return agents
[docs] @app.get("/agents/stats/{project_id}") async def agents_get_stats( project_id: UUID, request: Request, days: int = 7, agent_id: UUID | None = None ): """Fetch daily stats for agents in a specific project.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) async with get_project_session(project) as session: return await agent_stats.get_daily_stats(session, days=days, agent_id=agent_id)
[docs] @app.get("/agents/summary-stats/{project_id}") async def agents_get_summary_stats( project_id: UUID, request: Request, agent_id: UUID | None = None ): """Fetch summary stats (last 30 days) for agents in a specific project.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) async with get_project_session(project) as session: return await agent_stats.get_summary_stats(session, agent_id=agent_id)
[docs] @app.get("/agents/sessions/{project_id}") async def agents_get_sessions( project_id: UUID, request: Request, agent_id: UUID | None = None, search: str | None = None, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 50, offset: int = 0, ): """Fetch session summaries for a specific project.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) async with get_project_session(project) as session: return await agent_sessions.get_sessions_summary( session, agent_id=agent_id, search=search, start_date=start_date, end_date=end_date, limit=limit, offset=offset, )
[docs] @app.get("/agents/sessions/{project_id}/{session_id}/details") async def agents_get_session_details( project_id: UUID, session_id: UUID, request: Request, ): """Fetch all details (messages, runs, tasks) for a specific session.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) async with get_project_session(project) as session: return await agent_sessions.get_session_details(session, session_id)
[docs] @app.get("/projects/{project_id}/llm-call-stats") async def projects_get_llm_call_stats( project_id: UUID, request: Request, call_type: str | None = None, limit: int = 50, offset: int = 0, ): """Fetch paginated LLM call stats for a specific project.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) def sessionmaker_factory(): return db_manager.get_sessionmaker( user=project.db_user, password=project.db_password, host=project.db_host, port=project.db_port, db_name=project.db_name, ) # Since AgentService manages its own sessions, we wrap the whole call to catch connection errors try: service = AgentService(sessionmaker_factory()) return await service.get_model_call_stats( call_type=call_type, limit=limit, offset=offset ) except Exception as e: logger.error(f"Failed to connect to project database for {project.name}: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Database is not connected for project '{project.name}'. Please check your database settings.", )
[docs] @app.post("/projects/{project_id}/rag/query") async def projects_rag_query( project_id: UUID, request: Request, query_data: dict = Body(...), ): """Execute a RAG query for a specific project.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) model = query_data.get("model") text = query_data.get("text") collection_name = query_data.get("collection_name") top_k = query_data.get("top_k", 5) source_ids = query_data.get("source_ids") keep_best = query_data.get("keep_best", False) normalizer_yaml = query_data.get("normalizer_yaml") if not model or not text: raise HTTPException(status_code=400, detail="model and text are required") # Connect to the project database normalizer = None if normalizer_yaml: from kavalai.normalizer import Normalizer try: normalizer = Normalizer.from_yaml(normalizer_yaml) except Exception as e: raise HTTPException( status_code=400, detail=f"Invalid normalizer YAML: {str(e)}" ) async with get_project_session(project) as session: from contextlib import asynccontextmanager @asynccontextmanager async def session_factory(): yield session rag_service = RagService(session_factory, model, normalizer=normalizer) results = await rag_service.query( text=text, top_k=top_k, collection_name=collection_name, source_ids=source_ids, keep_best=keep_best, ) # Check for precomputed PCA model pca_data = None if collection_name: from kavalai.backoffice.db import ProjectCache import pickle import base64 import json import numpy as np model_cache_name = f"pca_model_{collection_name}" stmt = select(ProjectCache).where( ProjectCache.project_id == project_id, ProjectCache.name == model_cache_name, ) async with get_backoffice_session() as bo_session: res = await bo_session.execute(stmt) cache_entry = res.scalar_one_or_none() if cache_entry: try: ipca = pickle.loads(base64.b64decode(cache_entry.value)) # nosec B301 # Get query embedding embeddings, _ = await rag_service.llm_client.compute_embeddings( texts=[text], normalizer=normalizer ) query_point = ipca.transform(np.array(embeddings))[0] query_point = [float(x) for x in query_point] # Get result embeddings (already fetched during query) # We need to fetch actual embeddings for results as RagServiceResult doesn't include them # Let's modify RagService or fetch them here. # Actually, RagServiceResult doesn't have embeddings. Let's fetch them. result_ids = [r.id for r in results] logger.debug( f"Fetching embeddings for result IDs: {result_ids}" ) from kavalai.agents.db import RagIndex stmt_embeddings = select(RagIndex.id, RagIndex.embedding).where( RagIndex.id.in_(result_ids) ) res_embeddings = await session.execute(stmt_embeddings) id_to_embedding = { row[0]: row[1] for row in res_embeddings.all() } logger.debug(f"Found {len(id_to_embedding)} embeddings") result_points = [] for r in results: emb = id_to_embedding.get(r.id) if emb is not None: try: pt = ipca.transform(np.array([emb]))[0] result_points.append( { "label": r.content[:100], "x": float(pt[0]), "y": float(pt[1]), } ) except Exception as e: logger.error( f"Failed to transform embedding for result {r.id}: {e}" ) # Get sample points sample_cache_name = f"pca_sample_train_data_{collection_name}" stmt_sample = select(ProjectCache).where( ProjectCache.project_id == project_id, ProjectCache.name == sample_cache_name, ) res_sample = await bo_session.execute(stmt_sample) sample_entry = res_sample.scalar_one_or_none() sample_points = ( json.loads(sample_entry.value) if sample_entry else [] ) pca_data = { "query": { "label": text, "x": query_point[0], "y": query_point[1], }, "results": result_points, "samples": sample_points, } except Exception as e: logger.error(f"Failed to process PCA data: {e}") return {"results": results, "pca_data": pca_data}
[docs] @app.get("/projects/{project_id}/rag/stats") async def projects_rag_stats(project_id: UUID, request: Request): """Fetch RAG statistics for a specific project.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) # Connect to the project database async with get_project_session(project) as session: from sqlalchemy import func from kavalai.agents.db import RagIndex # Total entries stmt_entries = select(func.count(RagIndex.id)) result_entries = await session.execute(stmt_entries) total_entries = result_entries.scalar() # Collections count stmt_collections_count = select( func.count(func.distinct(RagIndex.collection_name)) ) result_collections_count = await session.execute(stmt_collections_count) total_collections = result_collections_count.scalar() # Collection names stmt_names = select(func.distinct(RagIndex.collection_name)) result_names = await session.execute(stmt_names) collections = result_names.scalars().all() return { "total_entries": total_entries, "total_collections": total_collections, "collections": collections, }
[docs] @app.get("/projects/{project_id}/rag/train-pca") async def projects_train_pca(project_id: UUID, collection_name: str, request: Request): """Trigger PCA training for a specific project and collection.""" assert_logged_in(request) project = await get_project_and_assert_access(request, project_id) import asyncio queue = asyncio.Queue() streamer = Streamer("pca_streamer", queue) async def event_generator(): try: # Run train_pca in a task so we can stream from the queue pca_task = asyncio.create_task( train_pca( bo_session_maker=get_backoffice_session, agents_session_maker=lambda: get_project_session(project), project_name=project.name, collection_name=collection_name, streamer=streamer, ) ) while not (pca_task.done() and queue.empty()): try: # Wait for a message with a timeout to check if the task is done msg = await asyncio.wait_for(queue.get(), timeout=0.1) yield {"data": msg} except asyncio.TimeoutError: if pca_task.done(): # Check if task failed if pca_task.exception(): logger.error(f"PCA training failed: {pca_task.exception()}") import json yield { "data": json.dumps( { "status": "error", "value": str(pca_task.exception()), } ) } break continue except Exception as e: logger.error(f"Error in PCA event generator: {e}") import json yield {"data": json.dumps({"status": "error", "value": str(e)})} return EventSourceResponse(event_generator())
[docs] @app.post("/projects/test-connection/{project_id}") async def projects_test_connection( project_id: str, request: Request, data: dict = Body(default={}) ): """Test connection to the project database.""" assert_logged_in(request) if project_id == "new": project = db.Project(**data) else: project = await get_project_and_assert_access(request, UUID(project_id)) service = ProjectService(db.AsyncBackofficeSession) return await service.test_connection(project)
[docs] @app.get("/projects/{project_id}/members") async def projects_get_members(project_id: UUID, request: Request): assert_logged_in(request) async with get_backoffice_session() as session: await assert_is_member(session, request, project_id) service = ProjectService(db.AsyncBackofficeSession) return await service.get_members(project_id)
[docs] @app.post("/projects/{project_id}/members/add") async def projects_add_member( project_id: UUID, request: Request, data: dict = Body(...) ): assert_logged_in(request) user_id = UUID(data["user_id"]) role = db.ProjectRole(data["role"]) async with get_backoffice_session() as session: # Only owner or admin can add members is_admin = request.session.get("user_info").get("is_admin") if not is_admin: await assert_is_owner(session, request, project_id) service = ProjectService(db.AsyncBackofficeSession) await service.add_member(project_id, user_id, role) return {"status": "added"}
[docs] @app.put("/projects/{project_id}/members/update") async def projects_update_member( project_id: UUID, request: Request, data: dict = Body(...) ): assert_logged_in(request) user_id = UUID(data["user_id"]) new_role = db.ProjectRole(data["role"]) async with get_backoffice_session() as session: is_admin = request.session.get("user_info").get("is_admin") if not is_admin: await assert_is_owner(session, request, project_id) service = ProjectService(db.AsyncBackofficeSession) await service.update_member_role(project_id, user_id, new_role) return {"status": "updated"}
[docs] @app.delete("/projects/{project_id}/members/remove/{user_id}") async def projects_remove_member(project_id: UUID, user_id: UUID, request: Request): assert_logged_in(request) async with get_backoffice_session() as session: is_admin = request.session.get("user_info").get("is_admin") if not is_admin: await assert_is_owner(session, request, project_id) service = ProjectService(db.AsyncBackofficeSession) await service.remove_member(project_id, user_id) return {"status": "removed"}
if __name__ == "__main__": config = uvicorn.Config( "kavalai.backoffice.server:app", port=8000, log_level="info", reload=True, access_log=True, ) server = uvicorn.Server(config) server.run()