diff --git a/docs/superpowers/plans/2026-05-04-rag-ingestor.md b/docs/superpowers/plans/2026-05-04-rag-ingestor.md new file mode 100644 index 0000000..d67aca4 --- /dev/null +++ b/docs/superpowers/plans/2026-05-04-rag-ingestor.md @@ -0,0 +1,2201 @@ +# RAG Ingestor Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a FastAPI microservice that ingests Nextcloud files into Qdrant with Ollama-generated embeddings, triggered by webhooks or manual bulk-import. + +**Architecture:** FastAPI receives Nextcloud webhooks, dispatches background tasks per file. Each task downloads via WebDAV, extracts text per file type, chunks with sentence-boundary look-back, embeds via Ollama, and upserts into Qdrant after deleting any existing chunks for the same file path. Configuration via environment variables, no external job queue. + +**Tech Stack:** Python 3.12, FastAPI, httpx, pymupdf, python-docx, ollama (Python lib), qdrant-client, pydantic, pydantic-settings, pytest, pytest-asyncio, uv. + +**Spec:** `docs/superpowers/specs/2026-05-04-rag-ingestor-design.md` + +--- + +## File Structure (created in this plan) + +``` +app/ + __init__.py + main.py # FastAPI app + lifespan + config.py # pydantic-settings + logging_setup.py # structlog-light setup + webhook/ + __init__.py + models.py # NextcloudEvent + auth.py # verify_secret + handler.py # POST /webhook + ingest/ + __init__.py + metadata.py # parse_path → {semester, fach, typ} + chunker.py # chunk_text + extractors.py # PDF/MD/DOCX/XLSX + webdav.py # download_file + embedder.py # embed_texts (with retry) + pipeline.py # process_file orchestrator + qdrant_store.py # ensure_collection, upsert, delete_by_path + bulk.py # POST /bulk-import +tests/ + __init__.py + conftest.py # fixtures (sample files) + test_metadata.py + test_chunker.py + test_extractors.py + test_webdav.py + test_embedder.py + test_qdrant_store.py + test_webhook.py + test_pipeline.py + test_bulk.py +docker/ + Dockerfile +docker-compose.yml +.env.example +.gitignore +pyproject.toml +README.md +``` + +--- + +## Task 1: Project Scaffolding + +**Files:** +- Create: `pyproject.toml` +- Create: `.gitignore` +- Create: `.env.example` +- Create: `app/__init__.py` (empty) +- Create: `app/webhook/__init__.py` (empty) +- Create: `app/ingest/__init__.py` (empty) +- Create: `tests/__init__.py` (empty) + +- [ ] **Step 1: Create `pyproject.toml`** + +```toml +[project] +name = "rag-ingestor" +version = "0.1.0" +description = "Nextcloud → Qdrant RAG ingestion service" +requires-python = ">=3.12" +dependencies = [ + "fastapi>=0.115", + "uvicorn[standard]>=0.32", + "httpx>=0.28", + "pydantic>=2.10", + "pydantic-settings>=2.7", + "pymupdf>=1.25", + "python-docx>=1.1", + "ollama>=0.4", + "qdrant-client>=1.12", +] + +[dependency-groups] +dev = [ + "pytest>=8.3", + "pytest-asyncio>=0.25", + "respx>=0.22", +] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.uv] +package = true + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["app"] +``` + +- [ ] **Step 2: Create `.gitignore`** + +``` +__pycache__/ +*.pyc +.pytest_cache/ +.venv/ +venv/ +.env +*.egg-info/ +dist/ +build/ +.coverage +.idea/ +.vscode/ +``` + +- [ ] **Step 3: Create `.env.example`** + +``` +NEXTCLOUD_WEBDAV_URL=https://nc.example.com/remote.php/dav/files/myuser +NEXTCLOUD_USER=myuser +NEXTCLOUD_APP_PASSWORD=changeme +OLLAMA_URL=http://ollama:11434 +OLLAMA_EMBED_MODEL=qwen3-embedding:0.6b +QDRANT_URL=http://qdrant:6333 +QDRANT_COLLECTION=rag_thb_studium +WEBHOOK_SECRET=changeme +INGEST_ROOT=Documents/THB/Studium +CHUNK_SIZE_WORDS=500 +CHUNK_OVERLAP_WORDS=50 +LOG_LEVEL=INFO +``` + +- [ ] **Step 4: Create empty package files** + +```bash +touch app/__init__.py app/webhook/__init__.py app/ingest/__init__.py tests/__init__.py +mkdir -p app/webhook app/ingest tests +``` + +- [ ] **Step 5: Install deps and verify** + +```bash +uv sync +``` + +Expected: creates `.venv/`, installs all dependencies without errors. + +- [ ] **Step 6: Commit** + +```bash +git add pyproject.toml .gitignore .env.example app/ tests/ +git commit -m "chore: project scaffolding mit uv und pyproject.toml" +``` + +--- + +## Task 2: Configuration (Settings) + +**Files:** +- Create: `app/config.py` +- Test: `tests/test_config.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_config.py +import os +import pytest +from app.config import Settings + + +def test_settings_loads_all_required_fields(monkeypatch): + monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "https://nc/remote.php/dav/files/u") + monkeypatch.setenv("NEXTCLOUD_USER", "u") + monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "pw") + monkeypatch.setenv("OLLAMA_URL", "http://ollama:11434") + monkeypatch.setenv("OLLAMA_EMBED_MODEL", "qwen3-embedding:0.6b") + monkeypatch.setenv("QDRANT_URL", "http://qdrant:6333") + monkeypatch.setenv("QDRANT_COLLECTION", "rag_test") + monkeypatch.setenv("WEBHOOK_SECRET", "secret") + + s = Settings() + + assert s.nextcloud_user == "u" + assert s.qdrant_collection == "rag_test" + assert s.ingest_root == "Documents/THB/Studium" # default + assert s.chunk_size_words == 500 + assert s.chunk_overlap_words == 50 + assert s.log_level == "INFO" + + +def test_settings_overrides_defaults(monkeypatch): + for k, v in { + "NEXTCLOUD_WEBDAV_URL": "x", "NEXTCLOUD_USER": "x", + "NEXTCLOUD_APP_PASSWORD": "x", "OLLAMA_URL": "x", + "OLLAMA_EMBED_MODEL": "x", "QDRANT_URL": "x", + "QDRANT_COLLECTION": "x", "WEBHOOK_SECRET": "x", + "INGEST_ROOT": "Other/Path", + "CHUNK_SIZE_WORDS": "300", + "CHUNK_OVERLAP_WORDS": "30", + }.items(): + monkeypatch.setenv(k, v) + + s = Settings() + + assert s.ingest_root == "Other/Path" + assert s.chunk_size_words == 300 + assert s.chunk_overlap_words == 30 + + +def test_settings_missing_required_raises(monkeypatch): + for k in ["NEXTCLOUD_WEBDAV_URL", "NEXTCLOUD_USER", "NEXTCLOUD_APP_PASSWORD", + "OLLAMA_URL", "OLLAMA_EMBED_MODEL", "QDRANT_URL", + "QDRANT_COLLECTION", "WEBHOOK_SECRET"]: + monkeypatch.delenv(k, raising=False) + + with pytest.raises(Exception): + Settings() +``` + +- [ ] **Step 2: Run test to verify it fails** + +```bash +uv run pytest tests/test_config.py -v +``` + +Expected: FAIL with `ModuleNotFoundError: No module named 'app.config'`. + +- [ ] **Step 3: Implement `app/config.py`** + +```python +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") + + nextcloud_webdav_url: str + nextcloud_user: str + nextcloud_app_password: str + + ollama_url: str + ollama_embed_model: str + + qdrant_url: str + qdrant_collection: str + + webhook_secret: str + + ingest_root: str = "Documents/THB/Studium" + chunk_size_words: int = 500 + chunk_overlap_words: int = 50 + log_level: str = "INFO" + + +_settings: Settings | None = None + + +def get_settings() -> Settings: + global _settings + if _settings is None: + _settings = Settings() + return _settings +``` + +- [ ] **Step 4: Run test to verify it passes** + +```bash +uv run pytest tests/test_config.py -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/config.py tests/test_config.py +git commit -m "feat: pydantic-settings config mit allen env-vars" +``` + +--- + +## Task 3: Logging Setup + +**Files:** +- Create: `app/logging_setup.py` + +- [ ] **Step 1: Implement `app/logging_setup.py`** + +No tests — formatter is a thin wrapper around stdlib. Visual validation only. + +```python +import logging +import sys + + +class KeyValueFormatter(logging.Formatter): + """Formats records as `LEVEL msg key1=val1 key2=val2`.""" + + def format(self, record: logging.LogRecord) -> str: + base = f"{record.levelname} {record.getMessage()}" + # extras land in record.__dict__ but mixed with stdlib keys; filter to known-extra keys + reserved = { + "name", "msg", "args", "levelname", "levelno", "pathname", "filename", + "module", "exc_info", "exc_text", "stack_info", "lineno", "funcName", + "created", "msecs", "relativeCreated", "thread", "threadName", + "processName", "process", "message", "taskName", + } + extras = {k: v for k, v in record.__dict__.items() if k not in reserved} + if extras: + base += " " + " ".join(f"{k}={v}" for k, v in extras.items()) + if record.exc_info: + base += "\n" + self.formatException(record.exc_info) + return base + + +def setup_logging(level: str = "INFO") -> None: + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(KeyValueFormatter()) + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + root.setLevel(level.upper()) +``` + +- [ ] **Step 2: Manual verification** + +```bash +uv run python -c " +from app.logging_setup import setup_logging +import logging +setup_logging('INFO') +logging.info('test event', extra={'event': 'startup', 'status': 'ok'}) +" +``` + +Expected stdout: `INFO test event event=startup status=ok`. + +- [ ] **Step 3: Commit** + +```bash +git add app/logging_setup.py +git commit -m "feat: key=value logging formatter" +``` + +--- + +## Task 4: Path-Metadata Parser + +**Files:** +- Create: `app/ingest/metadata.py` +- Test: `tests/test_metadata.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_metadata.py +from app.ingest.metadata import parse_path, PathMetadata + + +ROOT = "Documents/THB/Studium" + + +def test_parse_path_with_typ(): + md = parse_path("Documents/THB/Studium/2.Semester/Databases/Vorlesungen/DBS1.pdf", ROOT) + assert md == PathMetadata(semester="2.Semester", fach="Databases", typ="Vorlesungen") + + +def test_parse_path_without_typ(): + md = parse_path("Documents/THB/Studium/2.Semester/Databases/DBS1.pdf", ROOT) + assert md == PathMetadata(semester="2.Semester", fach="Databases", typ=None) + + +def test_parse_path_deep_nested_keeps_first_subdir_as_typ(): + md = parse_path("Documents/THB/Studium/2.Semester/Databases/Uebungen/01/sheet.pdf", ROOT) + assert md == PathMetadata(semester="2.Semester", fach="Databases", typ="Uebungen") + + +def test_parse_path_outside_root_returns_none(): + assert parse_path("Documents/Other/file.pdf", ROOT) is None + + +def test_parse_path_loose_file_in_thb_returns_none(): + assert parse_path("Documents/THB/Studienbescheinigung.pdf", ROOT) is None + + +def test_parse_path_loose_file_under_root_returns_none(): + # File directly under Studium with no semester folder + assert parse_path("Documents/THB/Studium/readme.txt", ROOT) is None + + +def test_parse_path_invalid_semester_pattern_returns_none(): + assert parse_path("Documents/THB/Studium/Sommersemester/Databases/x.pdf", ROOT) is None + + +def test_parse_path_no_fach_returns_none(): + # File directly in Semester folder with no fach subdir + assert parse_path("Documents/THB/Studium/2.Semester/loose.pdf", ROOT) is None + + +def test_parse_path_with_leading_slash_normalizes(): + md = parse_path("/Documents/THB/Studium/2.Semester/Databases/x.pdf", ROOT) + assert md == PathMetadata(semester="2.Semester", fach="Databases", typ=None) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_metadata.py -v +``` + +Expected: FAIL with `ModuleNotFoundError: No module named 'app.ingest.metadata'`. + +- [ ] **Step 3: Implement `app/ingest/metadata.py`** + +```python +import re +from dataclasses import dataclass +from pathlib import PurePosixPath + + +SEMESTER_RE = re.compile(r"^\d+\.Semester$") + + +@dataclass(frozen=True) +class PathMetadata: + semester: str + fach: str + typ: str | None + + +def parse_path(file_path: str, ingest_root: str) -> PathMetadata | None: + """Parse a Nextcloud file path into structured metadata. + + Returns None when the path is outside the ingest root or does not match + the expected `/.Semester//[/...]/` pattern. + """ + norm_path = file_path.lstrip("/") + norm_root = ingest_root.strip("/") + + if not norm_path.startswith(norm_root + "/"): + return None + + relative = norm_path[len(norm_root) + 1:] + parts = PurePosixPath(relative).parts + + # Need at least: semester / fach / file.ext → 3 parts + if len(parts) < 3: + return None + + semester, fach = parts[0], parts[1] + + if not SEMESTER_RE.match(semester): + return None + + # parts[-1] is the filename. Anything between fach and filename is "deeper". + # The first deeper segment becomes `typ`. None if file lives directly in fach. + typ = parts[2] if len(parts) > 3 else None + + return PathMetadata(semester=semester, fach=fach, typ=typ) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_metadata.py -v +``` + +Expected: 9 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/ingest/metadata.py tests/test_metadata.py +git commit -m "feat: pfad-metadata-parser mit semester/fach/typ" +``` + +--- + +## Task 5: Chunker + +**Files:** +- Create: `app/ingest/chunker.py` +- Test: `tests/test_chunker.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_chunker.py +from app.ingest.chunker import chunk_text, Chunk + + +def test_chunk_short_text_single_chunk(): + text = "Das ist ein kurzer Text mit wenigen Worten." + chunks = chunk_text(text, size_words=500, overlap_words=50, page=1) + assert len(chunks) == 1 + assert chunks[0].text == text + assert chunks[0].page == 1 + + +def test_chunk_size_and_overlap(): + words = [f"w{i}" for i in range(1200)] + text = " ".join(words) + chunks = chunk_text(text, size_words=500, overlap_words=50, page=1) + + # 1200 words, size 500, overlap 50 → step 450 → starts at 0, 450, 900 → 3 chunks + assert len(chunks) == 3 + # First chunk has up to 500 words + assert len(chunks[0].text.split()) <= 500 + # Overlap: last 50 words of chunk 0 are first 50 words of chunk 1 + last_50_of_first = chunks[0].text.split()[-50:] + first_50_of_second = chunks[1].text.split()[:50] + assert last_50_of_first == first_50_of_second + + +def test_chunk_respects_sentence_boundary_in_lookback_window(): + # 600 words, with a sentence ending around word 480 (within last 20% = words 400-500) + words = [f"w{i}" for i in range(600)] + words[479] = "ende." + text = " ".join(words) + chunks = chunk_text(text, size_words=500, overlap_words=50, page=1) + + # First chunk should end at the sentence boundary, not at word 500 + first_chunk_words = chunks[0].text.split() + assert first_chunk_words[-1] == "ende." + assert len(first_chunk_words) == 480 + + +def test_chunk_no_sentence_boundary_in_window_falls_back_to_word_count(): + words = [f"w{i}" for i in range(600)] + text = " ".join(words) + chunks = chunk_text(text, size_words=500, overlap_words=50, page=1) + # No sentence-end → exactly 500 words in first chunk + assert len(chunks[0].text.split()) == 500 + + +def test_chunk_empty_text_returns_empty_list(): + assert chunk_text("", size_words=500, overlap_words=50, page=1) == [] + + +def test_chunk_carries_page_number(): + chunks = chunk_text("hallo welt", size_words=500, overlap_words=50, page=7) + assert chunks[0].page == 7 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_chunker.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 3: Implement `app/ingest/chunker.py`** + +```python +from dataclasses import dataclass + + +SENTENCE_END_CHARS = (".", "!", "?") + + +@dataclass(frozen=True) +class Chunk: + text: str + page: int + + +def _find_sentence_boundary(words: list[str], window_start: int) -> int | None: + """Return index of last word ending with a sentence terminator within + [window_start, len(words)), or None if no boundary found. + + The returned index is the inclusive end-index of the sentence: the chunk + will include words[: idx + 1]. + """ + for i in range(len(words) - 1, window_start - 1, -1): + if words[i].endswith(SENTENCE_END_CHARS): + return i + return None + + +def chunk_text(text: str, size_words: int, overlap_words: int, page: int) -> list[Chunk]: + """Split text into ≤size_words chunks with overlap_words overlap. + + Each chunk ends at the last sentence boundary in the final 20% of the + `size_words` window when possible; otherwise it ends at exactly `size_words`. + """ + if not text.strip(): + return [] + + words = text.split() + if len(words) <= size_words: + return [Chunk(text=" ".join(words), page=page)] + + chunks: list[Chunk] = [] + start = 0 + lookback_window = max(1, int(size_words * 0.2)) + + while start < len(words): + hard_end = min(start + size_words, len(words)) + # Search for sentence boundary in last 20% of the window + if hard_end - start == size_words: + boundary_search_start = hard_end - lookback_window + boundary = _find_sentence_boundary(words[: hard_end], boundary_search_start) + end = boundary + 1 if boundary is not None else hard_end + else: + end = hard_end + + chunks.append(Chunk(text=" ".join(words[start:end]), page=page)) + + if end >= len(words): + break + + # Step forward: end - overlap, but never less than start + 1 + next_start = max(end - overlap_words, start + 1) + start = next_start + + return chunks +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_chunker.py -v +``` + +Expected: 6 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/ingest/chunker.py tests/test_chunker.py +git commit -m "feat: word-based chunker mit sentence-boundary look-back" +``` + +--- + +## Task 6: Extractors + Test Fixtures + +**Files:** +- Create: `app/ingest/extractors.py` +- Create: `tests/conftest.py` +- Test: `tests/test_extractors.py` + +- [ ] **Step 1: Create test fixtures via conftest** + +Sample files are generated dynamically so we don't commit binaries. + +```python +# tests/conftest.py +import io +import pytest +import fitz # pymupdf +from docx import Document + + +@pytest.fixture +def sample_pdf_bytes() -> bytes: + doc = fitz.open() + p1 = doc.new_page() + p1.insert_text((72, 72), "Seite eins enthaelt Lorem Ipsum.") + p2 = doc.new_page() + p2.insert_text((72, 72), "Seite zwei enthaelt mehr Text.") + data = doc.tobytes() + doc.close() + return data + + +@pytest.fixture +def sample_docx_bytes() -> bytes: + doc = Document() + doc.add_paragraph("Erster Absatz.") + doc.add_paragraph("Zweiter Absatz mit mehr Inhalt.") + buf = io.BytesIO() + doc.save(buf) + return buf.getvalue() + + +@pytest.fixture +def sample_md_bytes() -> bytes: + return "# Title\n\nFirst paragraph.\n\nSecond paragraph.\n".encode("utf-8") + + +@pytest.fixture +def sample_xlsx_bytes() -> bytes: + # Minimal placeholder; extractor doesn't read content for xlsx + return b"PK\x03\x04dummy" +``` + +- [ ] **Step 2: Write the failing tests** + +```python +# tests/test_extractors.py +import pytest +from app.ingest.extractors import extract, ExtractedPage, UnsupportedFileType + + +def test_extract_pdf_returns_pages(sample_pdf_bytes): + pages = extract(sample_pdf_bytes, "pdf", filename="x.pdf") + assert len(pages) == 2 + assert pages[0].page == 1 + assert "Seite eins" in pages[0].text + assert pages[1].page == 2 + assert "Seite zwei" in pages[1].text + + +def test_extract_docx_returns_single_page(sample_docx_bytes): + pages = extract(sample_docx_bytes, "docx", filename="x.docx") + assert len(pages) == 1 + assert pages[0].page == 1 + assert "Erster Absatz." in pages[0].text + assert "Zweiter Absatz" in pages[0].text + + +def test_extract_md_returns_single_page(sample_md_bytes): + pages = extract(sample_md_bytes, "md", filename="x.md") + assert len(pages) == 1 + assert pages[0].page == 1 + assert "First paragraph." in pages[0].text + + +def test_extract_xlsx_returns_filename_pseudo_text(sample_xlsx_bytes): + pages = extract(sample_xlsx_bytes, "xlsx", filename="my-sheet.xlsx") + assert len(pages) == 1 + assert pages[0].page == 1 + assert pages[0].text == "Tabelle: my-sheet.xlsx" + + +def test_extract_unsupported_raises(): + with pytest.raises(UnsupportedFileType): + extract(b"data", "txt", filename="x.txt") +``` + +- [ ] **Step 3: Run tests to verify they fail** + +```bash +uv run pytest tests/test_extractors.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 4: Implement `app/ingest/extractors.py`** + +```python +import io +from dataclasses import dataclass + +import fitz # pymupdf +from docx import Document + + +SUPPORTED_TYPES = {"pdf", "md", "docx", "xlsx"} + + +class UnsupportedFileType(Exception): + pass + + +@dataclass(frozen=True) +class ExtractedPage: + page: int + text: str + + +def extract(data: bytes, file_type: str, filename: str) -> list[ExtractedPage]: + file_type = file_type.lower().lstrip(".") + if file_type not in SUPPORTED_TYPES: + raise UnsupportedFileType(file_type) + + if file_type == "pdf": + return _extract_pdf(data) + if file_type == "md": + return _extract_md(data) + if file_type == "docx": + return _extract_docx(data) + if file_type == "xlsx": + return [ExtractedPage(page=1, text=f"Tabelle: {filename}")] + + raise UnsupportedFileType(file_type) # unreachable + + +def _extract_pdf(data: bytes) -> list[ExtractedPage]: + pages: list[ExtractedPage] = [] + with fitz.open(stream=data, filetype="pdf") as doc: + for i, page in enumerate(doc, start=1): + pages.append(ExtractedPage(page=i, text=page.get_text() or "")) + return pages + + +def _extract_md(data: bytes) -> list[ExtractedPage]: + return [ExtractedPage(page=1, text=data.decode("utf-8", errors="replace"))] + + +def _extract_docx(data: bytes) -> list[ExtractedPage]: + doc = Document(io.BytesIO(data)) + text = "\n\n".join(p.text for p in doc.paragraphs if p.text) + return [ExtractedPage(page=1, text=text)] +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +uv run pytest tests/test_extractors.py -v +``` + +Expected: 5 PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app/ingest/extractors.py tests/conftest.py tests/test_extractors.py +git commit -m "feat: extractors fuer pdf/md/docx/xlsx mit dynamic fixtures" +``` + +--- + +## Task 7: WebDAV Client + +**Files:** +- Create: `app/ingest/webdav.py` +- Test: `tests/test_webdav.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_webdav.py +import pytest +import httpx +import respx + +from app.ingest.webdav import download_file, WebDAVError + + +@pytest.mark.asyncio +async def test_download_file_returns_bytes(): + base = "https://nc.example.com/remote.php/dav/files/u" + file_path = "Documents/THB/Studium/2.Semester/Databases/x.pdf" + expected_url = f"{base}/{file_path}" + + with respx.mock(base_url=base) as mock: + mock.get(expected_url).mock(return_value=httpx.Response(200, content=b"PDFBYTES")) + data = await download_file(base, "u", "pw", file_path) + assert data == b"PDFBYTES" + + +@pytest.mark.asyncio +async def test_download_file_404_raises(): + base = "https://nc.example.com/remote.php/dav/files/u" + file_path = "missing.pdf" + expected_url = f"{base}/{file_path}" + + with respx.mock(base_url=base) as mock: + mock.get(expected_url).mock(return_value=httpx.Response(404)) + with pytest.raises(WebDAVError): + await download_file(base, "u", "pw", file_path) + + +@pytest.mark.asyncio +async def test_download_file_handles_url_encoding(): + base = "https://nc.example.com/remote.php/dav/files/u" + file_path = "Documents/Folder With Space/file.pdf" + + with respx.mock(base_url=base) as mock: + # httpx will percent-encode the spaces + route = mock.get(url__regex=r".*/Folder%20With%20Space/file\.pdf").mock( + return_value=httpx.Response(200, content=b"OK") + ) + data = await download_file(base, "u", "pw", file_path) + assert data == b"OK" + assert route.called +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_webdav.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 3: Implement `app/ingest/webdav.py`** + +```python +import httpx + + +class WebDAVError(Exception): + pass + + +async def download_file(base_url: str, user: str, password: str, file_path: str, *, timeout: float = 60.0) -> bytes: + """Fetch a file from Nextcloud WebDAV. Returns the raw bytes.""" + base = base_url.rstrip("/") + rel = file_path.lstrip("/") + url = f"{base}/{rel}" + + async with httpx.AsyncClient(auth=(user, password), timeout=timeout) as client: + response = await client.get(url) + + if response.status_code != 200: + raise WebDAVError( + f"WebDAV GET {file_path} failed: status={response.status_code}" + ) + return response.content +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_webdav.py -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/ingest/webdav.py tests/test_webdav.py +git commit -m "feat: webdav download via httpx mit basic-auth" +``` + +--- + +## Task 8: Ollama Embedder + +**Files:** +- Create: `app/ingest/embedder.py` +- Test: `tests/test_embedder.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_embedder.py +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch +import pytest + +from app.ingest.embedder import embed_texts, EmbeddingError + + +@pytest.mark.asyncio +async def test_embed_texts_returns_vectors(): + fake_client = MagicMock() + fake_client.embeddings = AsyncMock(side_effect=[ + {"embedding": [0.1, 0.2, 0.3]}, + {"embedding": [0.4, 0.5, 0.6]}, + ]) + + with patch("app.ingest.embedder._client", return_value=fake_client): + vectors = await embed_texts(["hello", "world"], model="qwen3-embedding:0.6b") + + assert vectors == [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] + assert fake_client.embeddings.call_count == 2 + + +@pytest.mark.asyncio +async def test_embed_texts_retries_on_failure(monkeypatch): + monkeypatch.setattr("app.ingest.embedder._BACKOFF_SECONDS", [0, 0, 0]) + + fake_client = MagicMock() + fake_client.embeddings = AsyncMock(side_effect=[ + Exception("connection refused"), + Exception("timeout"), + {"embedding": [1.0, 2.0]}, + ]) + + with patch("app.ingest.embedder._client", return_value=fake_client): + vectors = await embed_texts(["hi"], model="m") + + assert vectors == [[1.0, 2.0]] + assert fake_client.embeddings.call_count == 3 + + +@pytest.mark.asyncio +async def test_embed_texts_raises_after_max_retries(monkeypatch): + monkeypatch.setattr("app.ingest.embedder._BACKOFF_SECONDS", [0, 0, 0]) + + fake_client = MagicMock() + fake_client.embeddings = AsyncMock(side_effect=Exception("nope")) + + with patch("app.ingest.embedder._client", return_value=fake_client): + with pytest.raises(EmbeddingError): + await embed_texts(["hi"], model="m") + + assert fake_client.embeddings.call_count == 4 # initial + 3 retries +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_embedder.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 3: Implement `app/ingest/embedder.py`** + +```python +import asyncio +import logging +from functools import lru_cache + +from ollama import AsyncClient + +from app.config import get_settings + + +logger = logging.getLogger(__name__) + + +_BACKOFF_SECONDS = [1, 2, 4] + + +class EmbeddingError(Exception): + pass + + +@lru_cache(maxsize=1) +def _client() -> AsyncClient: + return AsyncClient(host=get_settings().ollama_url) + + +async def embed_texts(texts: list[str], model: str) -> list[list[float]]: + """Embed each text via Ollama. Retries individual calls 3x with backoff.""" + vectors: list[list[float]] = [] + for text in texts: + vec = await _embed_one(text, model) + vectors.append(vec) + return vectors + + +async def _embed_one(text: str, model: str) -> list[float]: + last_err: Exception | None = None + client = _client() + for attempt in range(len(_BACKOFF_SECONDS) + 1): + try: + response = await client.embeddings(model=model, prompt=text) + return list(response["embedding"]) + except Exception as exc: + last_err = exc + if attempt < len(_BACKOFF_SECONDS): + wait = _BACKOFF_SECONDS[attempt] + logger.warning( + "ollama embed retry", + extra={"event": "embed_retry", "attempt": attempt + 1, "wait_s": wait, "error": str(exc)}, + ) + await asyncio.sleep(wait) + raise EmbeddingError(f"embed failed after retries: {last_err}") + + +async def embedding_dimension(model: str) -> int: + """Probe a single embedding to discover the model's vector dimension.""" + vec = await _embed_one("dimension probe", model) + return len(vec) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_embedder.py -v +``` + +Expected: 3 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/ingest/embedder.py tests/test_embedder.py +git commit -m "feat: ollama embedder mit exponential backoff retry" +``` + +--- + +## Task 9: Qdrant Store + +**Files:** +- Create: `app/qdrant_store.py` +- Test: `tests/test_qdrant_store.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_qdrant_store.py +from unittest.mock import MagicMock, patch +import pytest + +from app.qdrant_store import ( + ensure_collection, + upsert_chunks, + delete_by_path, + ChunkPoint, +) + + +def test_ensure_collection_creates_when_missing(): + fake_client = MagicMock() + fake_client.collection_exists.return_value = False + + ensure_collection(fake_client, "rag_test", vector_size=1024) + + fake_client.create_collection.assert_called_once() + args, kwargs = fake_client.create_collection.call_args + assert kwargs["collection_name"] == "rag_test" + # Payload indexes get created + assert fake_client.create_payload_index.call_count == 3 + + +def test_ensure_collection_skips_when_exists_with_matching_dim(): + fake_client = MagicMock() + fake_client.collection_exists.return_value = True + info = MagicMock() + info.config.params.vectors.size = 1024 + fake_client.get_collection.return_value = info + + ensure_collection(fake_client, "rag_test", vector_size=1024) + + fake_client.create_collection.assert_not_called() + + +def test_ensure_collection_raises_on_dim_mismatch(): + fake_client = MagicMock() + fake_client.collection_exists.return_value = True + info = MagicMock() + info.config.params.vectors.size = 768 + fake_client.get_collection.return_value = info + + with pytest.raises(RuntimeError, match="dimension mismatch"): + ensure_collection(fake_client, "rag_test", vector_size=1024) + + +def test_upsert_chunks_calls_client_upsert(): + fake_client = MagicMock() + points = [ + ChunkPoint(vector=[0.1] * 4, payload={"file_path": "a", "chunk_index": 0}), + ChunkPoint(vector=[0.2] * 4, payload={"file_path": "a", "chunk_index": 1}), + ] + + upsert_chunks(fake_client, "rag_test", points) + + fake_client.upsert.assert_called_once() + kwargs = fake_client.upsert.call_args.kwargs + assert kwargs["collection_name"] == "rag_test" + assert len(kwargs["points"]) == 2 + + +def test_delete_by_path_uses_filter(): + fake_client = MagicMock() + delete_by_path(fake_client, "rag_test", "Documents/x.pdf") + + fake_client.delete.assert_called_once() + kwargs = fake_client.delete.call_args.kwargs + assert kwargs["collection_name"] == "rag_test" + # The filter should target file_path + selector = kwargs["points_selector"] + # Inspect the FilterSelector → Filter → must → FieldCondition + assert selector.filter.must[0].key == "file_path" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_qdrant_store.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 3: Implement `app/qdrant_store.py`** + +```python +import uuid +from dataclasses import dataclass +from typing import Any + +from qdrant_client import QdrantClient +from qdrant_client.http import models as qm + + +@dataclass(frozen=True) +class ChunkPoint: + vector: list[float] + payload: dict[str, Any] + + +def ensure_collection(client: QdrantClient, name: str, vector_size: int) -> None: + """Create the collection if missing. Crash if it exists with wrong dim.""" + if not client.collection_exists(name): + client.create_collection( + collection_name=name, + vectors_config=qm.VectorParams(size=vector_size, distance=qm.Distance.COSINE), + ) + for field in ("file_path", "semester", "fach"): + client.create_payload_index( + collection_name=name, + field_name=field, + field_schema=qm.PayloadSchemaType.KEYWORD, + ) + return + + info = client.get_collection(name) + existing = info.config.params.vectors.size + if existing != vector_size: + raise RuntimeError( + f"qdrant collection '{name}' dimension mismatch: " + f"existing={existing}, model={vector_size}. " + "Drop the collection manually and run a bulk import." + ) + + +def upsert_chunks(client: QdrantClient, name: str, chunks: list[ChunkPoint]) -> None: + points = [ + qm.PointStruct(id=str(uuid.uuid4()), vector=c.vector, payload=c.payload) + for c in chunks + ] + client.upsert(collection_name=name, points=points) + + +def delete_by_path(client: QdrantClient, name: str, file_path: str) -> None: + selector = qm.FilterSelector( + filter=qm.Filter( + must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))] + ) + ) + client.delete(collection_name=name, points_selector=selector) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_qdrant_store.py -v +``` + +Expected: 5 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/qdrant_store.py tests/test_qdrant_store.py +git commit -m "feat: qdrant store mit ensure/upsert/delete-by-path" +``` + +--- + +## Task 10: Webhook Models + Auth + +**Files:** +- Create: `app/webhook/models.py` +- Create: `app/webhook/auth.py` +- Test: `tests/test_webhook.py` (auth + model parts only here; full handler test in Task 12) + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_webhook.py +import pytest +from fastapi import HTTPException + +from app.webhook.models import NextcloudEvent, EventType +from app.webhook.auth import verify_secret + + +def test_event_parses_created(): + evt = NextcloudEvent(event_type="created", file_path="a/b.pdf", file_name="b.pdf") + assert evt.event_type == EventType.CREATED + + +def test_event_invalid_type_raises(): + with pytest.raises(Exception): + NextcloudEvent(event_type="exploded", file_path="a", file_name="a") + + +def test_verify_secret_pass(): + verify_secret(provided="abc", expected="abc") # no exception + + +def test_verify_secret_fail(): + with pytest.raises(HTTPException) as exc_info: + verify_secret(provided="wrong", expected="abc") + assert exc_info.value.status_code == 401 + + +def test_verify_secret_missing_fail(): + with pytest.raises(HTTPException) as exc_info: + verify_secret(provided=None, expected="abc") + assert exc_info.value.status_code == 401 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_webhook.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 3: Implement `app/webhook/models.py`** + +```python +from enum import Enum +from pydantic import BaseModel + + +class EventType(str, Enum): + CREATED = "created" + UPDATED = "updated" + DELETED = "deleted" + + +class NextcloudEvent(BaseModel): + event_type: EventType + file_path: str + file_name: str +``` + +- [ ] **Step 4: Implement `app/webhook/auth.py`** + +```python +import hmac + +from fastapi import HTTPException + + +def verify_secret(provided: str | None, expected: str) -> None: + """Constant-time comparison of the shared secret. + + Raises HTTPException(401) on mismatch or missing header. + """ + if provided is None or not hmac.compare_digest(provided, expected): + raise HTTPException(status_code=401, detail="invalid or missing secret") +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +uv run pytest tests/test_webhook.py -v +``` + +Expected: 5 PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app/webhook/models.py app/webhook/auth.py tests/test_webhook.py +git commit -m "feat: webhook event-model und shared-secret auth" +``` + +--- + +## Task 11: Pipeline Orchestration + +**Files:** +- Create: `app/ingest/pipeline.py` +- Test: `tests/test_pipeline.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_pipeline.py +from unittest.mock import AsyncMock, MagicMock, patch +import pytest + +from app.webhook.models import EventType +from app.ingest.pipeline import process_file + + +@pytest.mark.asyncio +async def test_process_deleted_event_calls_delete_only(monkeypatch): + qdrant = MagicMock() + monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant) + + download_mock = AsyncMock() + monkeypatch.setattr("app.ingest.pipeline.download_file", download_mock) + + await process_file( + file_path="Documents/THB/Studium/2.Semester/Databases/x.pdf", + event_type=EventType.DELETED, + ) + + download_mock.assert_not_called() + qdrant.delete.assert_called_once() + + +@pytest.mark.asyncio +async def test_process_outside_root_skips(monkeypatch): + qdrant = MagicMock() + monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant) + + download_mock = AsyncMock() + monkeypatch.setattr("app.ingest.pipeline.download_file", download_mock) + + await process_file( + file_path="Documents/Other/x.pdf", + event_type=EventType.CREATED, + ) + + download_mock.assert_not_called() + qdrant.delete.assert_not_called() + qdrant.upsert.assert_not_called() + + +@pytest.mark.asyncio +async def test_process_unsupported_extension_skips(monkeypatch): + qdrant = MagicMock() + monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant) + monkeypatch.setattr("app.ingest.pipeline.download_file", AsyncMock()) + + await process_file( + file_path="Documents/THB/Studium/2.Semester/Databases/notes.txt", + event_type=EventType.CREATED, + ) + + qdrant.upsert.assert_not_called() + + +@pytest.mark.asyncio +async def test_process_created_full_flow(monkeypatch, sample_pdf_bytes): + qdrant = MagicMock() + monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant) + + monkeypatch.setattr( + "app.ingest.pipeline.download_file", + AsyncMock(return_value=sample_pdf_bytes), + ) + monkeypatch.setattr( + "app.ingest.pipeline.embed_texts", + AsyncMock(return_value=[[0.1] * 4, [0.2] * 4]), + ) + + await process_file( + file_path="Documents/THB/Studium/2.Semester/Databases/Vorlesungen/x.pdf", + event_type=EventType.CREATED, + ) + + # delete called first (idempotency), upsert called after + qdrant.delete.assert_called_once() + qdrant.upsert.assert_called_once() + + upserted_points = qdrant.upsert.call_args.kwargs["points"] + assert len(upserted_points) >= 1 + payload = upserted_points[0].payload + assert payload["semester"] == "2.Semester" + assert payload["fach"] == "Databases" + assert payload["typ"] == "Vorlesungen" + assert payload["file_type"] == "pdf" + assert payload["chunk_index"] == 0 + assert "ingested_at" in payload +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_pipeline.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 3: Implement `app/ingest/pipeline.py`** + +```python +import logging +from datetime import datetime, timezone +from functools import lru_cache +from pathlib import PurePosixPath + +from qdrant_client import QdrantClient + +from app.config import get_settings +from app.ingest.chunker import chunk_text +from app.ingest.embedder import embed_texts +from app.ingest.extractors import extract, UnsupportedFileType, SUPPORTED_TYPES +from app.ingest.metadata import parse_path +from app.ingest.webdav import download_file +from app.qdrant_store import upsert_chunks, delete_by_path, ChunkPoint +from app.webhook.models import EventType + + +logger = logging.getLogger(__name__) + + +@lru_cache(maxsize=1) +def _qdrant_client() -> QdrantClient: + return QdrantClient(url=get_settings().qdrant_url) + + +async def process_file(file_path: str, event_type: EventType) -> None: + """End-to-end pipeline for one file event.""" + settings = get_settings() + file_path = file_path.lstrip("/") + + metadata = parse_path(file_path, settings.ingest_root) + if metadata is None: + logger.info( + "skip outside ingest root", + extra={"event": "skip", "reason": "outside_root", "file": file_path}, + ) + return + + if event_type == EventType.DELETED: + delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path) + logger.info("deleted", extra={"event": "delete", "file": file_path}) + return + + extension = PurePosixPath(file_path).suffix.lstrip(".").lower() + if extension not in SUPPORTED_TYPES: + logger.info( + "skip unsupported type", + extra={"event": "skip", "reason": "unsupported_type", "file": file_path, "ext": extension}, + ) + return + + try: + data = await download_file( + settings.nextcloud_webdav_url, + settings.nextcloud_user, + settings.nextcloud_app_password, + file_path, + ) + except Exception as exc: + logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)}) + return + + try: + pages = extract(data, extension, filename=PurePosixPath(file_path).name) + except UnsupportedFileType: + logger.info("unsupported", extra={"event": "skip", "file": file_path}) + return + except Exception as exc: + logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)}) + return + + chunks: list[tuple[str, int, int]] = [] # (text, page, chunk_index) + chunk_index = 0 + for page in pages: + for chunk in chunk_text( + page.text, + size_words=settings.chunk_size_words, + overlap_words=settings.chunk_overlap_words, + page=page.page, + ): + chunks.append((chunk.text, chunk.page, chunk_index)) + chunk_index += 1 + + if not chunks: + logger.info("no chunks", extra={"event": "skip", "reason": "empty_text", "file": file_path}) + # Still delete any prior data for this path + delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path) + return + + try: + vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model) + except Exception as exc: + logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)}) + return + + now_iso = datetime.now(timezone.utc).isoformat() + file_name = PurePosixPath(file_path).name + points = [ + ChunkPoint( + vector=vec, + payload={ + "file_path": file_path, + "file_name": file_name, + "file_type": extension, + "semester": metadata.semester, + "fach": metadata.fach, + "typ": metadata.typ, + "page": page, + "chunk_index": idx, + "text": text, + "ingested_at": now_iso, + }, + ) + for vec, (text, page, idx) in zip(vectors, chunks) + ] + + qdrant = _qdrant_client() + delete_by_path(qdrant, settings.qdrant_collection, file_path) + upsert_chunks(qdrant, settings.qdrant_collection, points) + + logger.info( + "ingested", + extra={"event": "ingest_done", "file": file_path, "chunks": len(points)}, + ) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_pipeline.py -v +``` + +Expected: 4 PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app/ingest/pipeline.py tests/test_pipeline.py +git commit -m "feat: pipeline-orchestrator fuer single-file ingest" +``` + +--- + +## Task 12: FastAPI App + Webhook Handler + +**Files:** +- Create: `app/main.py` +- Create: `app/webhook/handler.py` +- Test: extend `tests/test_webhook.py` + +- [ ] **Step 1: Add failing tests for the endpoint** + +Append to `tests/test_webhook.py`: + +```python +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi.testclient import TestClient + + +def _make_app(monkeypatch): + """Build the FastAPI app with all external clients stubbed.""" + monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc") + monkeypatch.setenv("NEXTCLOUD_USER", "u") + monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p") + monkeypatch.setenv("OLLAMA_URL", "http://ollama") + monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m") + monkeypatch.setenv("QDRANT_URL", "http://qdrant") + monkeypatch.setenv("QDRANT_COLLECTION", "rag_test") + monkeypatch.setenv("WEBHOOK_SECRET", "abc") + + # Reset cached settings/clients + import app.config + app.config._settings = None + import app.ingest.pipeline as pipe + pipe._qdrant_client.cache_clear() + + # Stub the lifespan startup so it doesn't try to talk to real services + monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock()) + + from app.main import app + return app + + +def test_health_endpoint_no_auth(monkeypatch): + app = _make_app(monkeypatch) + with TestClient(app) as client: + r = client.get("/health") + assert r.status_code == 200 + assert r.json() == {"status": "ok"} + + +def test_webhook_rejects_missing_secret(monkeypatch): + app = _make_app(monkeypatch) + with TestClient(app) as client: + r = client.post("/webhook", json={ + "event_type": "created", + "file_path": "a/b.pdf", + "file_name": "b.pdf", + }) + assert r.status_code == 401 + + +def test_webhook_rejects_wrong_secret(monkeypatch): + app = _make_app(monkeypatch) + with TestClient(app) as client: + r = client.post( + "/webhook", + json={"event_type": "created", "file_path": "a/b.pdf", "file_name": "b.pdf"}, + headers={"X-Webhook-Secret": "wrong"}, + ) + assert r.status_code == 401 + + +def test_webhook_dispatches_background_task(monkeypatch): + app = _make_app(monkeypatch) + + process_mock = AsyncMock() + monkeypatch.setattr("app.webhook.handler.process_file", process_mock) + + with TestClient(app) as client: + r = client.post( + "/webhook", + json={ + "event_type": "created", + "file_path": "Documents/THB/Studium/2.Semester/Databases/x.pdf", + "file_name": "x.pdf", + }, + headers={"X-Webhook-Secret": "abc"}, + ) + + assert r.status_code == 202 + process_mock.assert_awaited_once() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_webhook.py -v +``` + +Expected: FAIL on the new endpoint tests with import errors. + +- [ ] **Step 3: Implement `app/webhook/handler.py`** + +```python +from fastapi import APIRouter, BackgroundTasks, Header, status + +from app.config import get_settings +from app.ingest.pipeline import process_file +from app.webhook.auth import verify_secret +from app.webhook.models import NextcloudEvent + + +router = APIRouter() + + +@router.post("/webhook", status_code=status.HTTP_202_ACCEPTED) +async def webhook( + event: NextcloudEvent, + background: BackgroundTasks, + x_webhook_secret: str | None = Header(default=None), +): + verify_secret(x_webhook_secret, get_settings().webhook_secret) + background.add_task(process_file, event.file_path, event.event_type) + return {"status": "accepted"} +``` + +- [ ] **Step 4: Implement `app/main.py`** + +```python +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from qdrant_client import QdrantClient + +from app.config import get_settings +from app.ingest.embedder import embedding_dimension +from app.logging_setup import setup_logging +from app.qdrant_store import ensure_collection +from app.webhook.handler import router as webhook_router + + +logger = logging.getLogger(__name__) + + +async def _startup_ensure_collection() -> None: + settings = get_settings() + dim = await embedding_dimension(settings.ollama_embed_model) + client = QdrantClient(url=settings.qdrant_url) + ensure_collection(client, settings.qdrant_collection, vector_size=dim) + logger.info( + "qdrant collection ready", + extra={"event": "startup", "collection": settings.qdrant_collection, "dim": dim}, + ) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + setup_logging(get_settings().log_level) + await _startup_ensure_collection() + yield + + +app = FastAPI(title="rag-ingestor", lifespan=lifespan) +app.include_router(webhook_router) + + +@app.get("/health") +async def health(): + return {"status": "ok"} +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +uv run pytest tests/test_webhook.py -v +``` + +Expected: 9 PASS (5 from earlier + 4 new). + +- [ ] **Step 6: Commit** + +```bash +git add app/main.py app/webhook/handler.py tests/test_webhook.py +git commit -m "feat: fastapi app mit lifespan, webhook handler und /health" +``` + +--- + +## Task 13: Bulk Import Endpoint + +**Files:** +- Create: `app/bulk.py` +- Modify: `app/main.py` (register router) +- Test: `tests/test_bulk.py` + +The bulk endpoint walks a folder via WebDAV `PROPFIND`, then dispatches one background task per supported file. + +- [ ] **Step 1: Write the failing tests** + +```python +# tests/test_bulk.py +from unittest.mock import AsyncMock +from fastapi.testclient import TestClient + + +def _make_app(monkeypatch): + monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc") + monkeypatch.setenv("NEXTCLOUD_USER", "u") + monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p") + monkeypatch.setenv("OLLAMA_URL", "http://ollama") + monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m") + monkeypatch.setenv("QDRANT_URL", "http://qdrant") + monkeypatch.setenv("QDRANT_COLLECTION", "rag_test") + monkeypatch.setenv("WEBHOOK_SECRET", "abc") + import app.config + app.config._settings = None + monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock()) + from app.main import app + return app + + +def test_bulk_import_lists_and_dispatches(monkeypatch): + app = _make_app(monkeypatch) + + listed = [ + "Documents/THB/Studium/2.Semester/Databases/a.pdf", + "Documents/THB/Studium/2.Semester/Databases/b.docx", + "Documents/THB/Studium/2.Semester/Databases/.rag-meta.json", # ignored + ] + monkeypatch.setattr("app.bulk.list_files_recursive", AsyncMock(return_value=listed)) + + process_mock = AsyncMock() + monkeypatch.setattr("app.bulk.process_file", process_mock) + + with TestClient(app) as client: + r = client.post( + "/bulk-import", + json={"path": "Documents/THB/Studium/2.Semester/Databases"}, + headers={"X-Webhook-Secret": "abc"}, + ) + + assert r.status_code == 202 + body = r.json() + assert body["dispatched"] == 2 # only .pdf and .docx, not the json sidecar + # Two background tasks queued (verified after client context exits) + + +def test_bulk_import_rejects_wrong_secret(monkeypatch): + app = _make_app(monkeypatch) + with TestClient(app) as client: + r = client.post("/bulk-import", json={"path": "x"}, headers={"X-Webhook-Secret": "nope"}) + assert r.status_code == 401 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_bulk.py -v +``` + +Expected: FAIL with `ModuleNotFoundError`. + +- [ ] **Step 3: Implement `app/bulk.py`** + +```python +import logging +import xml.etree.ElementTree as ET +from pathlib import PurePosixPath +from urllib.parse import unquote + +import httpx +from fastapi import APIRouter, BackgroundTasks, Header, status +from pydantic import BaseModel + +from app.config import get_settings +from app.ingest.extractors import SUPPORTED_TYPES +from app.ingest.pipeline import process_file +from app.webhook.auth import verify_secret +from app.webhook.models import EventType + + +logger = logging.getLogger(__name__) +router = APIRouter() + + +class BulkRequest(BaseModel): + path: str + + +PROPFIND_BODY = """ + +""" + +DAV_NS = {"d": "DAV:"} + + +async def list_files_recursive(base_url: str, user: str, password: str, path: str) -> list[str]: + """PROPFIND with Depth: infinity. Returns relative file paths (no folders).""" + base = base_url.rstrip("/") + rel = path.strip("/") + url = f"{base}/{rel}" + + async with httpx.AsyncClient(auth=(user, password), timeout=120.0) as client: + response = await client.request( + "PROPFIND", + url, + headers={"Depth": "infinity", "Content-Type": "application/xml"}, + content=PROPFIND_BODY, + ) + if response.status_code not in (200, 207): + raise RuntimeError(f"PROPFIND failed: status={response.status_code}") + + root = ET.fromstring(response.text) + base_path_segment = PurePosixPath(httpx.URL(base).path).as_posix() # "/remote.php/dav/files/u" + out: list[str] = [] + for resp in root.findall("d:response", DAV_NS): + href = resp.findtext("d:href", default="", namespaces=DAV_NS) + decoded = unquote(href) + # Strip the WebDAV base prefix → leaves "Documents/.../file.pdf" + if decoded.startswith(base_path_segment): + decoded = decoded[len(base_path_segment):] + decoded = decoded.lstrip("/") + if not decoded or decoded.endswith("/"): + continue + # Skip directory entries (those have resourcetype) + rt = resp.find("d:propstat/d:prop/d:resourcetype/d:collection", DAV_NS) + if rt is not None: + continue + out.append(decoded) + return out + + +@router.post("/bulk-import", status_code=status.HTTP_202_ACCEPTED) +async def bulk_import( + body: BulkRequest, + background: BackgroundTasks, + x_webhook_secret: str | None = Header(default=None), +): + settings = get_settings() + verify_secret(x_webhook_secret, settings.webhook_secret) + + files = await list_files_recursive( + settings.nextcloud_webdav_url, + settings.nextcloud_user, + settings.nextcloud_app_password, + body.path, + ) + + dispatched = 0 + for f in files: + ext = PurePosixPath(f).suffix.lstrip(".").lower() + if ext not in SUPPORTED_TYPES: + continue + background.add_task(process_file, f, EventType.CREATED) + dispatched += 1 + + logger.info( + "bulk dispatch", + extra={"event": "bulk_dispatch", "path": body.path, "dispatched": dispatched, "total_listed": len(files)}, + ) + return {"status": "accepted", "dispatched": dispatched} +``` + +- [ ] **Step 4: Modify `app/main.py` to include the bulk router** + +Edit `app/main.py`. After the `from app.webhook.handler import ...` line, add: + +```python +from app.bulk import router as bulk_router +``` + +After `app.include_router(webhook_router)`, add: + +```python +app.include_router(bulk_router) +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +uv run pytest tests/test_bulk.py -v +``` + +Expected: 2 PASS. + +- [ ] **Step 6: Run full test suite** + +```bash +uv run pytest -v +``` + +Expected: All tests pass (≈37 tests across 9 test files). + +- [ ] **Step 7: Commit** + +```bash +git add app/bulk.py app/main.py tests/test_bulk.py +git commit -m "feat: bulk-import endpoint mit propfind walk" +``` + +--- + +## Task 14: Dockerfile + Compose + +**Files:** +- Create: `docker/Dockerfile` +- Create: `docker-compose.yml` + +- [ ] **Step 1: Create `docker/Dockerfile`** + +```dockerfile +FROM python:3.12-slim AS builder + +RUN pip install --no-cache-dir uv + +WORKDIR /app +COPY pyproject.toml uv.lock* ./ +RUN uv sync --frozen --no-dev --no-install-project || uv sync --no-dev --no-install-project + +COPY app ./app +RUN uv sync --frozen --no-dev || uv sync --no-dev + + +FROM python:3.12-slim + +WORKDIR /app +COPY --from=builder /app/.venv /app/.venv +COPY --from=builder /app/app /app/app + +ENV PATH="/app/.venv/bin:$PATH" +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +``` + +- [ ] **Step 2: Create `docker-compose.yml` (lokales Beispiel)** + +```yaml +services: + ingestor: + build: + context: . + dockerfile: docker/Dockerfile + env_file: .env + ports: + - "8000:8000" + depends_on: + - qdrant + - ollama + + qdrant: + image: qdrant/qdrant:latest + ports: + - "6333:6333" + volumes: + - qdrant_data:/qdrant/storage + + ollama: + image: ollama/ollama:latest + ports: + - "11434:11434" + volumes: + - ollama_data:/root/.ollama + +volumes: + qdrant_data: + ollama_data: +``` + +- [ ] **Step 3: Build the image** + +```bash +docker build -f docker/Dockerfile -t rag-ingestor:dev . +``` + +Expected: Build succeeds. + +- [ ] **Step 4: Commit** + +```bash +git add docker/Dockerfile docker-compose.yml +git commit -m "chore: dockerfile und compose-beispiel" +``` + +--- + +## Task 15: README + +**Files:** +- Create: `README.md` + +- [ ] **Step 1: Create `README.md`** + +```markdown +# rag-ingestor + +Microservice der Dateien aus Nextcloud (`Documents/THB/Studium/`) in Qdrant indexiert. Embeddings via Ollama. + +## Endpoints + +- `POST /webhook` (Header `X-Webhook-Secret`): Nextcloud-Event-Empfang (`created` / `updated` / `deleted`). +- `POST /bulk-import` (Header `X-Webhook-Secret`): Body `{"path": "..."}` → rekursiver Re-Index. +- `GET /health`: Liveness-Probe. + +## Erwartete Ordnerstruktur + +``` +Documents/THB/Studium/.Semester//[]/ +``` + +Unterstützte Dateitypen: `.pdf`, `.md`, `.docx`, `.xlsx` (XLSX wird nur als Filename indexiert, kein Inhalt). + +## Konfiguration + +Siehe `.env.example`. Alle Werte über Env-Vars, kein Config-File. + +## Lokale Entwicklung + +```bash +uv sync +uv run pytest +uv run uvicorn app.main:app --reload +``` + +## Deployment + +Image bauen und in Coolify neben Qdrant + Ollama deployen: + +```bash +docker build -f docker/Dockerfile -t rag-ingestor . +``` + +## Tests + +```bash +uv run pytest -v +``` + +Tests deckt Pure-Logic ab (Metadata-Parser, Chunker, Extractors, Auth, Pipeline-Orchestrierung mit gemockten externen Services). Keine Integration-Tests gegen echte Ollama/Qdrant/WebDAV-Instanzen. +``` + +- [ ] **Step 2: Commit** + +```bash +git add README.md +git commit -m "docs: readme mit endpoints, struktur und entwicklung" +``` + +--- + +## Final Verification + +- [ ] **Run the full test suite** + +```bash +uv run pytest -v +``` + +Expected: All tests green. + +- [ ] **Smoke test the running service** (requires running Qdrant + Ollama with the configured model) + +```bash +cp .env.example .env +# Edit .env with real values +uv run uvicorn app.main:app +``` + +In another shell: + +```bash +curl http://localhost:8000/health +# {"status":"ok"} +``` + +- [ ] **Verify push to develop branch is in good shape** + +```bash +git log --oneline +git status +``` + +Expected: clean working tree, sequence of feature commits ending with the README commit.