AI Python Engineer / Software Engineer (LLM, RAG, Cloud)

I build LLM-powered products, RAG systems and reliable backend services.

I focus on Python, FastAPI, vector and relational databases, and cloud platforms (Azure/AWS) to turn business problems into production-ready systems. From prototypes to long-term maintenance, I care about data quality, observability and developer experience.

Python FastAPI LangChain / LangGraph RAG PostgreSQL Azure & AWS
Available for remote roles

LLM Infra & Backend

  • LLM apps with retrieval, tools and evaluation
  • APIs with strong validation and observability
  • Data pipelines from SQL + file stores to vectors
  • CI/CD and automated regression tests
9+ years
coding experience
RAG
LangChain, Chroma, Pinecone, Weaviate
Cloud
Azure Functions, App Service, AWS Lambda/ECS

About

Engineer with deep Python roots and a focus on LLM systems

I started coding in primary school, building simple games, websites and scripts to automate chores. That curiosity led me to C++ and Python, algorithms and debugging tools. I shipped early side projects for friends and local groups, learning quickly how to make code survive real usage.

Today I design and ship LLM systems: retrieval-augmented assistants, agentic workflows and evaluation loops that prove reliability. I am comfortable moving between prototypes and long-term maintenance - modeling data, designing APIs, instrumenting logs/metrics and keeping deployments healthy.

I like collaborating with product and ops teams to make sure AI features solve the right problem. My stack revolves around Python, FastAPI, SQLAlchemy, PostgreSQL/MySQL, vector databases (Pinecone, Weaviate, Chroma), LangChain/LangGraph, Azure/AWS, Docker and CI/CD with testing.

Skills

Systems thinking with strong Python, backend and cloud foundations

GenAI & LLM systems
  • LLMs: OpenAI, Anthropic, Azure OpenAI
  • Embeddings, tokenization, transformers basics
  • RAG pipelines, vector DBs (Pinecone, Weaviate, Chroma)
  • LangChain, LangGraph, CrewAI style orchestration
  • Prompt engineering, tools/agents, evaluation & monitoring
Python & backend engineering
  • Python, FastAPI, REST API design, background workers
  • ORMs: SQLAlchemy; PostgreSQL/MySQL schemas & migrations
  • Unit/integration tests with pytest, clean architecture
  • Validation, business rules, error handling & logging
Cloud & DevOps
  • Azure: Functions, App Service, Storage, Key Vault
  • AWS: Lambda, S3, ECS, IAM, Bedrock
  • Docker, basic Kubernetes, CI/CD (GitHub Actions, GitLab CI)
  • Logging, metrics, monitoring, pragmatic MLOps patterns
Developer experience & automation
  • Python automation scripts and bots
  • Workflow automation (n8n, Make, Zapier)
  • Internal tools: code review automation, IDE assistants, docs generators
Additional
  • C++, basic system administration
  • 3D modeling (Fusion 360), Adobe Suite, video editing

Projects

Recent AI + backend builds with real engineering depth

Advanced snippets below are production-oriented: typed Python, FastAPI, SQLAlchemy, tests and automation. Each project includes architecture visuals and deeper notes.

LLM Knowledge Assistant for Internal Systems

Python based assistant that answers questions about internal documentation and database records.

Python | FastAPI | PostgreSQL | SQLAlchemy | LangChain | OpenAI | Azure | Docker
  • Built ETL that pulls relational data and files, normalises them and loads vectors.
  • Designed schemas and views to filter by tenant, source and freshness.
  • Implemented FastAPI endpoints for chat, admin operations and data refresh.
  • Added metrics: latency, sources returned, error counts with structured logging.
from __future__ import annotations import asyncio import logging from uuid import UUID, uuid4 from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field, constr from app.services.chat import ChatService, RetrievalError, get_chat_service router = APIRouter() logger = logging.getLogger("app.chat") class ChatRequest(BaseModel): question: constr(min_length=3) = Field(..., description="User question") user_id: str = Field(..., min_length=3) trace_id: str | None = None class ChatResponse(BaseModel): answer: str sources: list[str] latency_ms: int async def _with_timeout(coro, *, timeout: float): return await asyncio.wait_for(coro, timeout=timeout) @router.post("/chat", response_model=ChatResponse, status_code=status.HTTP_200_OK) async def chat( payload: ChatRequest, svc: ChatService = Depends(get_chat_service), ) -> ChatResponse: """Answer a user question with retrieval + LLM while enforcing timeouts and structured logs.""" ctx = {"trace_id": payload.trace_id or str(uuid4()), "user_id": payload.user_id} logger.info("chat.request", extra=ctx | {"question": payload.question}) try: result = await _with_timeout( svc.answer(question=payload.question, user_id=payload.user_id), timeout=8.0, ) except asyncio.TimeoutError: logger.warning("chat.timeout", extra=ctx) raise HTTPException(status_code=504, detail="Upstream LLM timeout") except RetrievalError as exc: logger.exception("chat.retrieval_failed", extra=ctx | {"error": str(exc)}) raise HTTPException(status_code=500, detail="Context retrieval failed") logger.info( "chat.success", extra=ctx | {"sources": result.sources, "latency_ms": result.latency_ms}, ) return ChatResponse( answer=result.answer, sources=result.sources, latency_ms=result.latency_ms, )
from __future__ import annotations from typing import Any from uuid import uuid4 from sqlalchemy import JSON, Column, DateTime, Float, ForeignKey, Index, String, Text, func from sqlalchemy.dialects.postgresql import ARRAY, UUID from sqlalchemy.orm import declarative_base, relationship Base = declarative_base() class Document(Base): __tablename__ = "documents" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4) tenant_id = Column(String(36), nullable=False) title = Column(String(255), nullable=False) source = Column(String(120), nullable=False) created_at = Column(DateTime(timezone=True), server_default=func.now()) chunks = relationship("DocumentChunk", back_populates="document", lazy="selectin") class DocumentChunk(Base): __tablename__ = "document_chunks" __table_args__ = ( Index("ix_chunks_tenant_doc", "tenant_id", "document_id"), Index("ix_chunks_vector", "tenant_id", "embedding", postgresql_using="ivfflat"), ) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4) tenant_id = Column(String(36), nullable=False) document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id"), nullable=False) content = Column(Text, nullable=False) embedding = Column(ARRAY(Float), nullable=False) position = Column(Float, nullable=False, default=0) metadata = Column(JSON, default=dict) created_at = Column(DateTime(timezone=True), server_default=func.now()) document = relationship("Document", back_populates="chunks", lazy="joined") def as_context(self, score: float) -> dict[str, Any]: return { "id": str(self.id), "score": round(score, 4), "content": self.content, "source": self.metadata.get("source", self.document.source), }
graph LR User((User)) -->|question| API[FastAPI service] API --> Retriever[Retrieval layer] Retriever --> VectorDB[(Vector DB)] Retriever --> PG[(PostgreSQL)] Retriever --> LLM[LLM Provider] LLM --> API API -->|answer + sources| User
View more details

Hard limits on upstream LLM calls with graceful degradation, structured logs shipped to OpenTelemetry collector, and daily refresh of embeddings via Celery worker. CI deploys Docker image to Azure App Service with health checks and smoke tests against staging database snapshots.

RAG System for Customer Support

RAG pipeline that helps support teams answer tickets using a knowledge base and historical conversations.

Python | LangChain | Chroma | Azure OpenAI | Redis | pytest
  • Designed ingest -> chunk -> embed -> store -> retrieve -> answer flow with adapters per source.
  • Normalized FAQ, markdown docs and call transcripts before chunking.
  • Added evaluation harness and prompt A/B tests with simple heuristics.
  • Unit and integration tests with pytest plus ephemeral Chroma instances.
from __future__ import annotations import logging from collections import Counter from dataclasses import dataclass from typing import Iterable, Sequence from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Chroma from langchain.embeddings import AzureOpenAIEmbeddings logger = logging.getLogger("rag.pipeline") @dataclass class RawDocument: content: str source: str topic: str @dataclass class PipelineStats: chunks: int = 0 tokens_est: int = 0 sources: Counter[str] = Counter() def log(self) -> None: logger.info( "rag.ingest_completed", extra={ "chunks": self.chunks, "tokens_est": self.tokens_est, "top_sources": self.sources.most_common(3), }, ) def build_rag_index( docs: Sequence[RawDocument], collection: Chroma, embedder: AzureOpenAIEmbeddings, ) -> PipelineStats: """Normalize, chunk, embed and persist documents with basic ingest metrics.""" splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=80) stats = PipelineStats() for doc in docs: normalized = doc.content.strip().replace("\r\n", "\n") for chunk in splitter.split_text(normalized): emb = embedder.embed_query(chunk) collection.add_texts( texts=[chunk], embeddings=[emb], metadatas=[{"source": doc.source, "topic": doc.topic}], ) stats.chunks += 1 stats.tokens_est += max(len(chunk) // 4, 1) stats.sources[doc.source] += 1 stats.log() return stats
import pytest from langchain.vectorstores import Chroma from rag.answer import answer_question from rag.pipeline import RawDocument, build_rag_index @pytest.fixture(scope="module") def small_kb(tmp_path_factory): path = tmp_path_factory.mktemp("kb") store = Chroma(collection_name="support", persist_directory=str(path)) docs = [ RawDocument("Reset password via Settings -> Security", source="faq", topic="auth"), RawDocument("Refunds are processed within 7 days", source="policy", topic="billing"), ] build_rag_index(docs, collection=store, embedder=fake_embeddings()) yield store store.delete_collection() def fake_embeddings(): class _Fake: def embed_query(self, text: str): return [hash(text) % 1000 / 1000 for _ in range(1536)] return _Fake() def test_retrieval_and_answer_quality(small_kb): question = "How long do refunds take?" answer, meta = answer_question(question, store=small_kb) assert meta.similarity >= 0.68 assert "7 days" in answer.lower() assert meta.source in {"faq", "policy"}
flowchart LR Ingest[Ingest adapters] --> Normalize[Normalize per source] Normalize --> Chunk[Smart chunking] Chunk --> Embed[Embeddings] Embed --> Chroma[(Chroma DB)] UserQ[User ticket] --> Retrieve[Retriever] Retrieve --> Chroma Retrieve --> Compose[LLM composer] Compose --> Answer[Answer + citations]
Prompt variantPrecisionRecallAvg. latency (ms)
v1-short0.740.69820
v2-grounded0.810.76910
v3-safety0.780.801010
View more details

Cached embeddings in Redis, used semantic filters by topic and source, and added nightly evaluation jobs to compare prompt variants. Alerts fire when similarity drops or latency spikes, using simple thresholds in Prometheus-compatible metrics.

Agentic Workflow for Data Cleaning

AI agent that cleans and normalises tabular data before analytics, escalating ambiguous fixes.

Python | Pandas | NumPy | LangGraph/CrewAI | Azure Functions | PostgreSQL
  • Detected missing data, anomalies and inconsistencies across ingestion batches.
  • Agent proposed fixes and mappings; escalated uncertain cases for review.
  • Audit logs stored in PostgreSQL with before/after values per change.
  • Documented architecture and sequence for handoffs.
from __future__ import annotations from dataclasses import dataclass from typing import Any import numpy as np import pandas as pd @dataclass class Issue: row_id: Any column: str issue: str suggested_fix: str def detect_anomalies(frame: pd.DataFrame, z_threshold: float = 3.2) -> list[Issue]: """Detect outliers, missing values and ambiguous textual delimiters.""" issues: list[Issue] = [] numeric_cols = frame.select_dtypes(include=["number"]).columns for col in numeric_cols: series = frame[col] z_scores = np.abs((series - series.mean()) / (series.std() or 1)) for idx in series.index[z_scores > z_threshold]: issues.append( Issue( row_id=idx, column=col, issue="outlier", suggested_fix=f"clip to p99 ({series.quantile(0.99):.2f})", ) ) for col in frame.columns: missing = frame[col].isna() if missing.any(): issues.append( Issue( row_id="*", column=col, issue="missing_values", suggested_fix="impute with median or forward fill", ) ) if frame[col].dtype == "object": duplicated = frame[col].str.contains(";|,") for idx in frame[duplicated].index: issues.append( Issue( row_id=idx, column=col, issue="ambiguous_delimiter", suggested_fix="split field and normalise", ) ) return issues
from __future__ import annotations from langgraph.graph import END, StateGraph class AuditLogRepository: async def write(self, *, row_id: str, column: str, action: str, detail: dict) -> None: ... class State(dict): issues: list accepted: list pending_review: list def build_data_cleaning_graph(audit_repo: AuditLogRepository): """Orchestrate auto-fixes vs escalations and write audit entries for every step.""" graph = StateGraph(State) async def classify(state: State): state.setdefault("accepted", []) state.setdefault("pending_review", []) for issue in state["issues"]: if issue.issue == "outlier" and "clip" in issue.suggested_fix: state["accepted"].append(issue) else: state["pending_review"].append(issue) return state async def apply_fixes(state: State): for issue in state["accepted"]: await audit_repo.write( row_id=str(issue.row_id), column=issue.column, action="auto_fix", detail={"fix": issue.suggested_fix}, ) return state async def escalate(state: State): for issue in state["pending_review"]: await audit_repo.write( row_id=str(issue.row_id), column=issue.column, action="needs_confirmation", detail={"reason": issue.issue}, ) return state graph.add_node("classify", classify) graph.add_node("apply_fixes", apply_fixes) graph.add_node("escalate", escalate) graph.set_entry_point("classify") graph.add_edge("classify", "apply_fixes") graph.add_edge("classify", "escalate") graph.add_edge("apply_fixes", END) graph.add_edge("escalate", END) return graph.compile()
sequenceDiagram participant Ingest participant Agent participant Reviewer participant Audit Ingest->>Agent: anomalies list Agent->>Agent: auto-fix eligible? Agent-->>Audit: write auto_fix Agent-->>Reviewer: request confirmation Reviewer-->>Audit: approve/override
FieldBeforeSuggestedAfter
Price1,000,000Clip to 820,000820,000
CountryPL; DESplit + pick modePL
Signup date-Forward fill2024-10-02
View more details

Azure Function triggers the pipeline per batch, writes anomalies to PostgreSQL and publishes review tasks to Teams via webhook. Maintainers can replay decisions; every change is audit-tracked with before/after snapshots for compliance.

Evaluation Dashboard for LLM Apps

Internal tool to evaluate and monitor prompts and model configurations over time.

Python | FastAPI | PostgreSQL | SQLAlchemy | Plotly | GitHub Actions
  • Backend stores requests, responses, metadata and user ratings.
  • API endpoints expose grouped stats for dashboards and reports.
  • Scheduled batch tests run via GitHub Actions and push results.
  • Charts show success rate and latency trends per model/prompt.
from __future__ import annotations from datetime import datetime from typing import Annotated from fastapi import APIRouter, Depends, Query from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.db import get_session from app.models import RequestLog router = APIRouter() @router.get("/metrics/requests") async def request_metrics( model: Annotated[str | None, Query()] = None, prompt_version: Annotated[str | None, Query(alias="prompt")] = None, start: Annotated[datetime | None, Query()] = None, end: Annotated[datetime | None, Query()] = None, session: AsyncSession = Depends(get_session), ): """Return aggregated success rates and latency percentiles per model/prompt.""" stmt = ( select( RequestLog.model, RequestLog.prompt_version, func.count().label("total"), func.sum(func.case((RequestLog.success.is_(True), 1), else_=0)).label("success"), func.percentile_cont(0.5).within_group(RequestLog.latency_ms).label("p50"), func.percentile_cont(0.95).within_group(RequestLog.latency_ms).label("p95"), ) .where(RequestLog.created_at >= (start or func.now() - func.cast("7 days", func.interval))) .group_by(RequestLog.model, RequestLog.prompt_version) ) if model: stmt = stmt.where(RequestLog.model == model) if prompt_version: stmt = stmt.where(RequestLog.prompt_version == prompt_version) if end: stmt = stmt.where(RequestLog.created_at <= end) rows = (await session.execute(stmt)).all() return [ { "model": r.model, "prompt_version": r.prompt_version, "success_rate": round(r.success / r.total, 3), "p50_ms": int(r.p50 or 0), "p95_ms": int(r.p95 or 0), } for r in rows ]
name: nightly-evals on: schedule: - cron: "0 2 * * *" workflow_dispatch: jobs: evaluate: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: "3.11" - name: Install run: pip install -r requirements.txt - name: Run evaluations env: EVAL_API_BASE: ${{ secrets.EVAL_API_BASE }} EVAL_API_TOKEN: ${{ secrets.EVAL_API_TOKEN }} run: | python scripts/run_evals.py --prompt latest --limit 50 --output metrics.json - name: Upload results run: | curl -H "Authorization: Bearer $EVAL_API_TOKEN" \ -H "Content-Type: application/json" \ -d @metrics.json "$EVAL_API_BASE/metrics/import"
View more details

Data stored in PostgreSQL with partitions per month. Aggregations use materialized views refreshed via cron. Frontend widgets load via light JS, keeping GitHub Pages friendly while pulling live data from the API.

Timeline

Career highlights

Early years

Primary & secondary school: built games, websites and scripts; learned to debug and ship things classmates actually used.

Self study & side projects

Deepened algorithms, C++ and Python; kept shipping side projects and tools for friends and NGOs.

2022 - now

Freelance Software Engineer & Automation Developer: backend services, internal tools, integrations.

2023 - now

AI Python Engineer: LLM applications, RAG systems, agents, evaluations and production integrations.

Contact

Let's talk