# RAG Ingestor – Design Stand: 2026-05-04 ## Zweck Microservice der neue/geänderte/gelöschte Dateien aus einem Nextcloud-Ordner automatisch in eine Qdrant-Collection einliest. Trigger sind Nextcloud-Webhooks oder ein manueller Bulk-Import-Endpoint. Embeddings werden via Ollama erzeugt. ## Scope - **Beobachteter Root:** `Documents/THB/Studium/` (konfigurierbar via Env). Alles außerhalb wird mit 200 OK ignoriert. - **Erwartete Pfadstruktur:** `Documents/THB/Studium/.Semester//[]/` - `.Semester` matcht Regex `^\d+\.Semester$` - `` = direkter Kindordner des Semesters - Tiefere Pfadsegmente sind erlaubt; das erste Segment darunter wird als optionales Feld `typ` mitgeführt (z. B. `Vorlesungen`, `Uebungen`) - **Unterstützte Dateitypen:** `.pdf`, `.md`, `.docx`, `.xlsx`. Andere → skip + log. - **XLSX-Sonderfall:** Inhalt wird **nicht** extrahiert. Stattdessen wird ein Pseudo-Text der Form `Tabelle: ` indexiert, damit die Datei als „existiert" auffindbar ist. ## Architektur ``` Nextcloud ──webhook──▶ FastAPI ──BackgroundTask──▶ Pipeline │ │ ▼ ▼ Auth (X-Webhook-Secret) WebDAV download │ ▼ Extract (pdf/md/docx/xlsx) │ ▼ Chunk (≈500w/50w) │ ▼ Embed (Ollama) │ ▼ Qdrant: delete_by_filter(file_path) + upsert points ``` Verarbeitung **asynchron im Prozess** über `fastapi.BackgroundTasks`. Kein externer Job-Broker. Bei Service-Crash gehen In-Flight-Jobs verloren; Recovery erfolgt durch manuellen Bulk-Import. ## Modulstruktur ``` app/ main.py # FastAPI-App, lifespan (Qdrant-Collection ensure) config.py # pydantic-settings, alle Env-Vars webhook/ handler.py # POST /webhook auth.py # Shared-Secret-Verifikation models.py # NextcloudEvent (created/updated/deleted) ingest/ pipeline.py # process_file(path, event) — orchestriert Stages webdav.py # download_file(path) → bytes (httpx Basic Auth) extractors.py # PDF (pymupdf), MD (utf-8), DOCX (python-docx), XLSX chunker.py # word-based mit Sentence-Boundary look-back embedder.py # Ollama /api/embeddings + Retry metadata.py # parse_path → {semester, fach, typ?} qdrant_store.py # ensure_collection, upsert_chunks, delete_by_path bulk.py # POST /bulk-import — recursive walk + dispatch logging_setup.py tests/ fixtures/ # Mini-Sample-Files (sample.pdf, sample.docx, ...) test_metadata.py test_chunker.py test_extractors.py test_webhook.py docker/ Dockerfile .env.example pyproject.toml README.md ``` ## HTTP-Endpoints | Method | Pfad | Auth | Zweck | |--------|----------------|-----------------------|-----------------------------------------| | POST | `/webhook` | `X-Webhook-Secret` | Nextcloud-Event-Empfang | | POST | `/bulk-import` | `X-Webhook-Secret` | `{path}` rekursiv ingestieren | | GET | `/health` | — | Liveness-Probe (Coolify) | ### Webhook Request (NextcloudEvent) ```json { "event_type": "created" | "updated" | "deleted", "file_path": "Documents/THB/Studium/2.Semester/Databases/Vorlesungen/DBS1_02.pdf", "file_name": "DBS1_02.pdf" } ``` Response: `202 Accepted` sobald dispatched (Auth + Validierung haben gegriffen). Bei `401`/`422` kein Dispatch. ### Bulk-Import Request ```json { "path": "Documents/THB/Studium/2.Semester/Databases" } ``` Walks rekursiv, dispatched pro Datei einen `created`-Event-Job. Einzel-Failures unterbrechen den Walk nicht. ## Pipeline-Logik (pro Datei) 1. **Pfad-Filter:** Pfad muss mit `INGEST_ROOT` beginnen → sonst skip + log. 2. **Event-Branching:** - `deleted` → `qdrant.delete_by_filter(file_path=...)`. Fertig. - `created` / `updated` → weitermachen. 3. **Extension-Whitelist:** `.pdf`, `.md`, `.docx`, `.xlsx`. Sonst → skip. 4. **WebDAV-Download:** `httpx.AsyncClient` mit Basic Auth (User + App-Passwort), 60 s Timeout. 5. **Extraktion** (gibt Liste `[(page_num, text)]` zurück): - **PDF**: `pymupdf` Seite für Seite - **MD**: bytes → `utf-8` decode, `page=1` - **DOCX**: `python-docx` Paragraphs joined mit `\n\n`, `page=1` - **XLSX**: kein Inhalt — Pseudo-Text `"Tabelle: "`, `page=1` 6. **Chunking:** ~500 Wörter pro Chunk, ~50 Wörter Overlap. Sliding window mit Look-back zum nächsten Satzende (`. ! ?`) im letzten ~20 % des Chunks. Pro Chunk Felder: `text`, `page` (vom Extractor mitgegeben), `chunk_index` (aufsteigend pro Datei, beginnend bei 0). 7. **Embedding:** Ollama `POST /api/embeddings` pro Chunk, Modell aus Env. Retry **3×** mit Exponential Backoff (1 s, 2 s, 4 s) bei HTTP-Fehlern. 8. **Qdrant-Write:** Erst `delete_by_filter(file_path)` (idempotent für Webhook-Duplikate und für Updates), dann `upsert` aller neuen Points (UUIDs als IDs, Payload mit Metadaten). ## Qdrant-Schema - **Collection:** `rag_thb_studium` (aus Env) - **Vector-Dimension:** zur Boot-Zeit über Ollama `POST /api/show` ermittelt. Wenn Ollama oder Qdrant beim Start nicht erreichbar sind → Service crasht (fail-fast, Coolify startet neu). Falls Collection noch nicht existiert → wird neu erstellt. Falls Collection mit **anderer** Vektordimension existiert → Service crasht beim Start mit klarer Fehlermeldung; Operator muss Collection manuell droppen + Bulk-Import laufen lassen. - **Distance:** `Cosine` - **Payload-Indexes (beim Ensure):** - `file_path` (keyword, für `delete_by_filter`) - `semester` (keyword, für Query-Filter) - `fach` (keyword, für Query-Filter) - **Payload-Schema pro Chunk:** ```json { "file_path": "Documents/THB/Studium/2.Semester/Databases/Vorlesungen/DBS1_02.pdf", "file_name": "DBS1_02.pdf", "file_type": "pdf", "semester": "2.Semester", "fach": "Databases", "typ": "Vorlesungen", "page": 4, "chunk_index": 17, "text": "...", "ingested_at": "2026-05-04T20:58:00Z" } ``` `typ` ist `null`, wenn die Datei direkt im Fachordner liegt. ## Konfiguration Alle Werte über Env-Vars (`pydantic-settings`): ``` NEXTCLOUD_WEBDAV_URL=https://nc.example.com/remote.php/dav/files/ NEXTCLOUD_USER= NEXTCLOUD_APP_PASSWORD=... OLLAMA_URL=http://ollama:11434 OLLAMA_EMBED_MODEL=qwen3-embedding:0.6b QDRANT_URL=http://qdrant:6333 QDRANT_COLLECTION=rag_thb_studium WEBHOOK_SECRET= INGEST_ROOT=Documents/THB/Studium CHUNK_SIZE_WORDS=500 CHUNK_OVERLAP_WORDS=50 LOG_LEVEL=INFO ``` `.env.example` wird mit Dummy-Werten committet. ## Fehlerbehandlung - **Webhook-Layer:** - Auth-Fail → `401` - Invalides Payload → `422` - Pfad außerhalb `INGEST_ROOT` → `202` + Skip-Log (nicht als Fehler werten, Nextcloud kann beliebige Events senden) - **Pipeline (im BackgroundTask):** `try/except` um jede Stage. Fehler werden geloggt mit `file_path`-Kontext, Job wird abgebrochen. **Kein Retry auf Pipeline-Ebene** (außer dem internen Embed-Retry). - **Crash während In-Flight:** akzeptiert; Recovery via Bulk-Import. - **Webhook-Duplikate:** Implizit idempotent durch `delete_by_filter` + frischer Insert. - **Bulk-Import:** Einzel-Failures unterbrechen den Walk nicht (nur Log + continue). ## Logging - Stdlib `logging` → stdout. - Default-Format: human-readable Key=Value, z. B. `INFO event=download file=Documents/THB/Studium/2.Semester/Databases/DBS1.pdf duration_ms=842 status=ok` - Default-Level: `INFO`. `DEBUG` per Env. - Pro Pipeline-Stage ein Log-Event mit `file_path`, `event` (= Stage-Name), `status`, ggf. `duration_ms` und `error`. - Kein JSON-Log-Format zum Start (Coolify-Logs sollen lesbar bleiben). Erweiterbar via Env-Flag, falls später nötig. ## Tests - `pytest` + `pytest-asyncio`. - **Unit-Fokus** auf Pure-Logic — keine Integration-Tests gegen echte Ollama-/Qdrant-Instanzen. - `test_metadata.py`: Pfad-Parser für valide Pfade (alle Semester/Fach-Varianten), invalide Pfade (Studienbescheinigung, außerhalb-Root, Fach ohne Subordner) - `test_chunker.py`: Chunk-Größen, Overlap, Satz-Boundary-Look-Back, kurze Texte (< 1 Chunk), edge cases - `test_extractors.py`: Mini-Sample-Files (sehr kleine, eingecheckte Fixtures) für jeden unterstützten Typ - `test_webhook.py`: Auth-Header-Validierung, Payload-Schema, Skip-Verhalten für Pfade außerhalb `INGEST_ROOT` ## Deployment - `Dockerfile` (Base: `python:3.12-slim`, Deps via `uv`) - `docker-compose.yml` als Beispiel mit Service-Stub neben Qdrant + Ollama (für lokale Entwicklung) - In Coolify wird der Container neben den existierenden Qdrant- und Ollama-Diensten deployed. Webhook-URL ist nur intern erreichbar. ## Tech-Stack-Pin - Python 3.12 - FastAPI - httpx (WebDAV + Ollama) - pymupdf, python-docx - qdrant-client - pydantic, pydantic-settings - pytest, pytest-asyncio - uv (Dependency-Management) ## Bewusste Auslassungen (YAGNI) - **Sidecar-Metadaten** (`.rag-meta.json` o. ä.) — geparkt, bis konkreter Filter-Bedarf entsteht. - **Job-Queue** (Redis/Celery/Arq) — BackgroundTasks reichen für aktuelles Volumen. - **Persistenter Job-State** — Recovery via Bulk-Import statt eigener State-Store. - **Sub-Path-Watching** unter `Studium/` — alle Pfade unter dem Root werden ingestiert. - **DOCX-Bilder/-Tabellen-Extraktion** — nur Paragraphs. - **XLSX-Inhaltsextraktion** — nur Filename als Pseudo-Indexeintrag. - **Authentifizierte WebDAV-User-Differenzierung** — ein einziger Service-User, App-Passwort. - **Reindex-Migrationen** bei Modell-/Dimension-Änderung — manueller Drop + Bulk-Import. ## Voraussetzungen außerhalb des Service - 1. Semester wird vom Nutzer von „nach Dozent gruppiert" auf „nach Fach gruppiert" umstrukturiert (passend zur 2. Semester-Konvention). - Studieninhalte werden in einen neuen Unterordner `Documents/THB/Studium/` verschoben. Lose Dateien wie `Studienbescheinigung.pdf` bleiben außerhalb und werden nicht ingestiert. - Optional: Tippfehler `Algorythm` → `Algorithms` korrigieren (kein Code-Impact, nur Convention). - Nextcloud-App-Passwort für den Service-User wird angelegt. - Webhook-Konfiguration in Nextcloud zeigt auf den Service-Endpoint mit korrektem `X-Webhook-Secret`.