LLM Knowledge Assistant for Internal Systems
Python based assistant that answers questions about internal documentation and database records.
- Built ETL that pulls relational data and files, normalises them and loads vectors.
- Designed schemas and views to filter by tenant, source and freshness.
- Implemented FastAPI endpoints for chat, admin operations and data refresh.
- Added metrics: latency, sources returned, error counts with structured logging.
from __future__ import annotations
import asyncio
import logging
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field, constr
from app.services.chat import ChatService, RetrievalError, get_chat_service
router = APIRouter()
logger = logging.getLogger("app.chat")
class ChatRequest(BaseModel):
question: constr(min_length=3) = Field(..., description="User question")
user_id: str = Field(..., min_length=3)
trace_id: str | None = None
class ChatResponse(BaseModel):
answer: str
sources: list[str]
latency_ms: int
async def _with_timeout(coro, *, timeout: float):
return await asyncio.wait_for(coro, timeout=timeout)
@router.post("/chat", response_model=ChatResponse, status_code=status.HTTP_200_OK)
async def chat(
payload: ChatRequest,
svc: ChatService = Depends(get_chat_service),
) -> ChatResponse:
"""Answer a user question with retrieval + LLM while enforcing timeouts and structured logs."""
ctx = {"trace_id": payload.trace_id or str(uuid4()), "user_id": payload.user_id}
logger.info("chat.request", extra=ctx | {"question": payload.question})
try:
result = await _with_timeout(
svc.answer(question=payload.question, user_id=payload.user_id),
timeout=8.0,
)
except asyncio.TimeoutError:
logger.warning("chat.timeout", extra=ctx)
raise HTTPException(status_code=504, detail="Upstream LLM timeout")
except RetrievalError as exc:
logger.exception("chat.retrieval_failed", extra=ctx | {"error": str(exc)})
raise HTTPException(status_code=500, detail="Context retrieval failed")
logger.info(
"chat.success",
extra=ctx | {"sources": result.sources, "latency_ms": result.latency_ms},
)
return ChatResponse(
answer=result.answer,
sources=result.sources,
latency_ms=result.latency_ms,
)
from __future__ import annotations
from typing import Any
from uuid import uuid4
from sqlalchemy import JSON, Column, DateTime, Float, ForeignKey, Index, String, Text, func
from sqlalchemy.dialects.postgresql import ARRAY, UUID
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
class Document(Base):
__tablename__ = "documents"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
tenant_id = Column(String(36), nullable=False)
title = Column(String(255), nullable=False)
source = Column(String(120), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
chunks = relationship("DocumentChunk", back_populates="document", lazy="selectin")
class DocumentChunk(Base):
__tablename__ = "document_chunks"
__table_args__ = (
Index("ix_chunks_tenant_doc", "tenant_id", "document_id"),
Index("ix_chunks_vector", "tenant_id", "embedding", postgresql_using="ivfflat"),
)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
tenant_id = Column(String(36), nullable=False)
document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id"), nullable=False)
content = Column(Text, nullable=False)
embedding = Column(ARRAY(Float), nullable=False)
position = Column(Float, nullable=False, default=0)
metadata = Column(JSON, default=dict)
created_at = Column(DateTime(timezone=True), server_default=func.now())
document = relationship("Document", back_populates="chunks", lazy="joined")
def as_context(self, score: float) -> dict[str, Any]:
return {
"id": str(self.id),
"score": round(score, 4),
"content": self.content,
"source": self.metadata.get("source", self.document.source),
}
View more details
Hard limits on upstream LLM calls with graceful degradation, structured logs shipped to OpenTelemetry collector, and daily refresh of embeddings via Celery worker. CI deploys Docker image to Azure App Service with health checks and smoke tests against staging database snapshots.