Files
rag-ingestor/docs/superpowers/plans/2026-05-04-rag-ingestor.md
Jean-Luc Makiola 8746b187a7 docs: implementation plan mit 15 tasks
Bite-sized TDD-Tasks mit komplettem Code in jedem Step. Reihenfolge
bottom-up: pure-logic units zuerst (metadata, chunker), dann externe
Services (webdav, ollama, qdrant), dann Orchestrierung und API,
abschliessend Docker und README.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-04 21:36:00 +02:00

58 KiB

RAG Ingestor Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build a FastAPI microservice that ingests Nextcloud files into Qdrant with Ollama-generated embeddings, triggered by webhooks or manual bulk-import.

Architecture: FastAPI receives Nextcloud webhooks, dispatches background tasks per file. Each task downloads via WebDAV, extracts text per file type, chunks with sentence-boundary look-back, embeds via Ollama, and upserts into Qdrant after deleting any existing chunks for the same file path. Configuration via environment variables, no external job queue.

Tech Stack: Python 3.12, FastAPI, httpx, pymupdf, python-docx, ollama (Python lib), qdrant-client, pydantic, pydantic-settings, pytest, pytest-asyncio, uv.

Spec: docs/superpowers/specs/2026-05-04-rag-ingestor-design.md


File Structure (created in this plan)

app/
  __init__.py
  main.py                    # FastAPI app + lifespan
  config.py                  # pydantic-settings
  logging_setup.py           # structlog-light setup
  webhook/
    __init__.py
    models.py                # NextcloudEvent
    auth.py                  # verify_secret
    handler.py               # POST /webhook
  ingest/
    __init__.py
    metadata.py              # parse_path → {semester, fach, typ}
    chunker.py               # chunk_text
    extractors.py            # PDF/MD/DOCX/XLSX
    webdav.py                # download_file
    embedder.py              # embed_texts (with retry)
    pipeline.py              # process_file orchestrator
  qdrant_store.py            # ensure_collection, upsert, delete_by_path
  bulk.py                    # POST /bulk-import
tests/
  __init__.py
  conftest.py                # fixtures (sample files)
  test_metadata.py
  test_chunker.py
  test_extractors.py
  test_webdav.py
  test_embedder.py
  test_qdrant_store.py
  test_webhook.py
  test_pipeline.py
  test_bulk.py
docker/
  Dockerfile
docker-compose.yml
.env.example
.gitignore
pyproject.toml
README.md

Task 1: Project Scaffolding

Files:

  • Create: pyproject.toml

  • Create: .gitignore

  • Create: .env.example

  • Create: app/__init__.py (empty)

  • Create: app/webhook/__init__.py (empty)

  • Create: app/ingest/__init__.py (empty)

  • Create: tests/__init__.py (empty)

  • Step 1: Create pyproject.toml

[project]
name = "rag-ingestor"
version = "0.1.0"
description = "Nextcloud → Qdrant RAG ingestion service"
requires-python = ">=3.12"
dependencies = [
    "fastapi>=0.115",
    "uvicorn[standard]>=0.32",
    "httpx>=0.28",
    "pydantic>=2.10",
    "pydantic-settings>=2.7",
    "pymupdf>=1.25",
    "python-docx>=1.1",
    "ollama>=0.4",
    "qdrant-client>=1.12",
]

[dependency-groups]
dev = [
    "pytest>=8.3",
    "pytest-asyncio>=0.25",
    "respx>=0.22",
]

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

[tool.uv]
package = true

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["app"]
  • Step 2: Create .gitignore
__pycache__/
*.pyc
.pytest_cache/
.venv/
venv/
.env
*.egg-info/
dist/
build/
.coverage
.idea/
.vscode/
  • Step 3: Create .env.example
NEXTCLOUD_WEBDAV_URL=https://nc.example.com/remote.php/dav/files/myuser
NEXTCLOUD_USER=myuser
NEXTCLOUD_APP_PASSWORD=changeme
OLLAMA_URL=http://ollama:11434
OLLAMA_EMBED_MODEL=qwen3-embedding:0.6b
QDRANT_URL=http://qdrant:6333
QDRANT_COLLECTION=rag_thb_studium
WEBHOOK_SECRET=changeme
INGEST_ROOT=Documents/THB/Studium
CHUNK_SIZE_WORDS=500
CHUNK_OVERLAP_WORDS=50
LOG_LEVEL=INFO
  • Step 4: Create empty package files
touch app/__init__.py app/webhook/__init__.py app/ingest/__init__.py tests/__init__.py
mkdir -p app/webhook app/ingest tests
  • Step 5: Install deps and verify
uv sync

Expected: creates .venv/, installs all dependencies without errors.

  • Step 6: Commit
git add pyproject.toml .gitignore .env.example app/ tests/
git commit -m "chore: project scaffolding mit uv und pyproject.toml"

Task 2: Configuration (Settings)

Files:

  • Create: app/config.py

  • Test: tests/test_config.py

  • Step 1: Write the failing test

# tests/test_config.py
import os
import pytest
from app.config import Settings


def test_settings_loads_all_required_fields(monkeypatch):
    monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "https://nc/remote.php/dav/files/u")
    monkeypatch.setenv("NEXTCLOUD_USER", "u")
    monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "pw")
    monkeypatch.setenv("OLLAMA_URL", "http://ollama:11434")
    monkeypatch.setenv("OLLAMA_EMBED_MODEL", "qwen3-embedding:0.6b")
    monkeypatch.setenv("QDRANT_URL", "http://qdrant:6333")
    monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
    monkeypatch.setenv("WEBHOOK_SECRET", "secret")

    s = Settings()

    assert s.nextcloud_user == "u"
    assert s.qdrant_collection == "rag_test"
    assert s.ingest_root == "Documents/THB/Studium"  # default
    assert s.chunk_size_words == 500
    assert s.chunk_overlap_words == 50
    assert s.log_level == "INFO"


def test_settings_overrides_defaults(monkeypatch):
    for k, v in {
        "NEXTCLOUD_WEBDAV_URL": "x", "NEXTCLOUD_USER": "x",
        "NEXTCLOUD_APP_PASSWORD": "x", "OLLAMA_URL": "x",
        "OLLAMA_EMBED_MODEL": "x", "QDRANT_URL": "x",
        "QDRANT_COLLECTION": "x", "WEBHOOK_SECRET": "x",
        "INGEST_ROOT": "Other/Path",
        "CHUNK_SIZE_WORDS": "300",
        "CHUNK_OVERLAP_WORDS": "30",
    }.items():
        monkeypatch.setenv(k, v)

    s = Settings()

    assert s.ingest_root == "Other/Path"
    assert s.chunk_size_words == 300
    assert s.chunk_overlap_words == 30


def test_settings_missing_required_raises(monkeypatch):
    for k in ["NEXTCLOUD_WEBDAV_URL", "NEXTCLOUD_USER", "NEXTCLOUD_APP_PASSWORD",
              "OLLAMA_URL", "OLLAMA_EMBED_MODEL", "QDRANT_URL",
              "QDRANT_COLLECTION", "WEBHOOK_SECRET"]:
        monkeypatch.delenv(k, raising=False)

    with pytest.raises(Exception):
        Settings()
  • Step 2: Run test to verify it fails
uv run pytest tests/test_config.py -v

Expected: FAIL with ModuleNotFoundError: No module named 'app.config'.

  • Step 3: Implement app/config.py
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")

    nextcloud_webdav_url: str
    nextcloud_user: str
    nextcloud_app_password: str

    ollama_url: str
    ollama_embed_model: str

    qdrant_url: str
    qdrant_collection: str

    webhook_secret: str

    ingest_root: str = "Documents/THB/Studium"
    chunk_size_words: int = 500
    chunk_overlap_words: int = 50
    log_level: str = "INFO"


_settings: Settings | None = None


def get_settings() -> Settings:
    global _settings
    if _settings is None:
        _settings = Settings()
    return _settings
  • Step 4: Run test to verify it passes
uv run pytest tests/test_config.py -v

Expected: 3 PASS.

  • Step 5: Commit
git add app/config.py tests/test_config.py
git commit -m "feat: pydantic-settings config mit allen env-vars"

Task 3: Logging Setup

Files:

  • Create: app/logging_setup.py

  • Step 1: Implement app/logging_setup.py

No tests — formatter is a thin wrapper around stdlib. Visual validation only.

import logging
import sys


class KeyValueFormatter(logging.Formatter):
    """Formats records as `LEVEL msg key1=val1 key2=val2`."""

    def format(self, record: logging.LogRecord) -> str:
        base = f"{record.levelname} {record.getMessage()}"
        # extras land in record.__dict__ but mixed with stdlib keys; filter to known-extra keys
        reserved = {
            "name", "msg", "args", "levelname", "levelno", "pathname", "filename",
            "module", "exc_info", "exc_text", "stack_info", "lineno", "funcName",
            "created", "msecs", "relativeCreated", "thread", "threadName",
            "processName", "process", "message", "taskName",
        }
        extras = {k: v for k, v in record.__dict__.items() if k not in reserved}
        if extras:
            base += " " + " ".join(f"{k}={v}" for k, v in extras.items())
        if record.exc_info:
            base += "\n" + self.formatException(record.exc_info)
        return base


def setup_logging(level: str = "INFO") -> None:
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(KeyValueFormatter())
    root = logging.getLogger()
    root.handlers.clear()
    root.addHandler(handler)
    root.setLevel(level.upper())
  • Step 2: Manual verification
uv run python -c "
from app.logging_setup import setup_logging
import logging
setup_logging('INFO')
logging.info('test event', extra={'event': 'startup', 'status': 'ok'})
"

Expected stdout: INFO test event event=startup status=ok.

  • Step 3: Commit
git add app/logging_setup.py
git commit -m "feat: key=value logging formatter"

Task 4: Path-Metadata Parser

Files:

  • Create: app/ingest/metadata.py

  • Test: tests/test_metadata.py

  • Step 1: Write the failing tests

# tests/test_metadata.py
from app.ingest.metadata import parse_path, PathMetadata


ROOT = "Documents/THB/Studium"


def test_parse_path_with_typ():
    md = parse_path("Documents/THB/Studium/2.Semester/Databases/Vorlesungen/DBS1.pdf", ROOT)
    assert md == PathMetadata(semester="2.Semester", fach="Databases", typ="Vorlesungen")


def test_parse_path_without_typ():
    md = parse_path("Documents/THB/Studium/2.Semester/Databases/DBS1.pdf", ROOT)
    assert md == PathMetadata(semester="2.Semester", fach="Databases", typ=None)


def test_parse_path_deep_nested_keeps_first_subdir_as_typ():
    md = parse_path("Documents/THB/Studium/2.Semester/Databases/Uebungen/01/sheet.pdf", ROOT)
    assert md == PathMetadata(semester="2.Semester", fach="Databases", typ="Uebungen")


def test_parse_path_outside_root_returns_none():
    assert parse_path("Documents/Other/file.pdf", ROOT) is None


def test_parse_path_loose_file_in_thb_returns_none():
    assert parse_path("Documents/THB/Studienbescheinigung.pdf", ROOT) is None


def test_parse_path_loose_file_under_root_returns_none():
    # File directly under Studium with no semester folder
    assert parse_path("Documents/THB/Studium/readme.txt", ROOT) is None


def test_parse_path_invalid_semester_pattern_returns_none():
    assert parse_path("Documents/THB/Studium/Sommersemester/Databases/x.pdf", ROOT) is None


def test_parse_path_no_fach_returns_none():
    # File directly in Semester folder with no fach subdir
    assert parse_path("Documents/THB/Studium/2.Semester/loose.pdf", ROOT) is None


def test_parse_path_with_leading_slash_normalizes():
    md = parse_path("/Documents/THB/Studium/2.Semester/Databases/x.pdf", ROOT)
    assert md == PathMetadata(semester="2.Semester", fach="Databases", typ=None)
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_metadata.py -v

Expected: FAIL with ModuleNotFoundError: No module named 'app.ingest.metadata'.

  • Step 3: Implement app/ingest/metadata.py
import re
from dataclasses import dataclass
from pathlib import PurePosixPath


SEMESTER_RE = re.compile(r"^\d+\.Semester$")


@dataclass(frozen=True)
class PathMetadata:
    semester: str
    fach: str
    typ: str | None


def parse_path(file_path: str, ingest_root: str) -> PathMetadata | None:
    """Parse a Nextcloud file path into structured metadata.

    Returns None when the path is outside the ingest root or does not match
    the expected `<root>/<N>.Semester/<Fach>/[<typ>/...]/<file>` pattern.
    """
    norm_path = file_path.lstrip("/")
    norm_root = ingest_root.strip("/")

    if not norm_path.startswith(norm_root + "/"):
        return None

    relative = norm_path[len(norm_root) + 1:]
    parts = PurePosixPath(relative).parts

    # Need at least: semester / fach / file.ext  → 3 parts
    if len(parts) < 3:
        return None

    semester, fach = parts[0], parts[1]

    if not SEMESTER_RE.match(semester):
        return None

    # parts[-1] is the filename. Anything between fach and filename is "deeper".
    # The first deeper segment becomes `typ`. None if file lives directly in fach.
    typ = parts[2] if len(parts) > 3 else None

    return PathMetadata(semester=semester, fach=fach, typ=typ)
  • Step 4: Run tests to verify they pass
uv run pytest tests/test_metadata.py -v

Expected: 9 PASS.

  • Step 5: Commit
git add app/ingest/metadata.py tests/test_metadata.py
git commit -m "feat: pfad-metadata-parser mit semester/fach/typ"

Task 5: Chunker

Files:

  • Create: app/ingest/chunker.py

  • Test: tests/test_chunker.py

  • Step 1: Write the failing tests

# tests/test_chunker.py
from app.ingest.chunker import chunk_text, Chunk


def test_chunk_short_text_single_chunk():
    text = "Das ist ein kurzer Text mit wenigen Worten."
    chunks = chunk_text(text, size_words=500, overlap_words=50, page=1)
    assert len(chunks) == 1
    assert chunks[0].text == text
    assert chunks[0].page == 1


def test_chunk_size_and_overlap():
    words = [f"w{i}" for i in range(1200)]
    text = " ".join(words)
    chunks = chunk_text(text, size_words=500, overlap_words=50, page=1)

    # 1200 words, size 500, overlap 50 → step 450 → starts at 0, 450, 900 → 3 chunks
    assert len(chunks) == 3
    # First chunk has up to 500 words
    assert len(chunks[0].text.split()) <= 500
    # Overlap: last 50 words of chunk 0 are first 50 words of chunk 1
    last_50_of_first = chunks[0].text.split()[-50:]
    first_50_of_second = chunks[1].text.split()[:50]
    assert last_50_of_first == first_50_of_second


def test_chunk_respects_sentence_boundary_in_lookback_window():
    # 600 words, with a sentence ending around word 480 (within last 20% = words 400-500)
    words = [f"w{i}" for i in range(600)]
    words[479] = "ende."
    text = " ".join(words)
    chunks = chunk_text(text, size_words=500, overlap_words=50, page=1)

    # First chunk should end at the sentence boundary, not at word 500
    first_chunk_words = chunks[0].text.split()
    assert first_chunk_words[-1] == "ende."
    assert len(first_chunk_words) == 480


def test_chunk_no_sentence_boundary_in_window_falls_back_to_word_count():
    words = [f"w{i}" for i in range(600)]
    text = " ".join(words)
    chunks = chunk_text(text, size_words=500, overlap_words=50, page=1)
    # No sentence-end → exactly 500 words in first chunk
    assert len(chunks[0].text.split()) == 500


def test_chunk_empty_text_returns_empty_list():
    assert chunk_text("", size_words=500, overlap_words=50, page=1) == []


def test_chunk_carries_page_number():
    chunks = chunk_text("hallo welt", size_words=500, overlap_words=50, page=7)
    assert chunks[0].page == 7
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_chunker.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 3: Implement app/ingest/chunker.py
from dataclasses import dataclass


SENTENCE_END_CHARS = (".", "!", "?")


@dataclass(frozen=True)
class Chunk:
    text: str
    page: int


def _find_sentence_boundary(words: list[str], window_start: int) -> int | None:
    """Return index of last word ending with a sentence terminator within
    [window_start, len(words)), or None if no boundary found.

    The returned index is the inclusive end-index of the sentence: the chunk
    will include words[: idx + 1].
    """
    for i in range(len(words) - 1, window_start - 1, -1):
        if words[i].endswith(SENTENCE_END_CHARS):
            return i
    return None


def chunk_text(text: str, size_words: int, overlap_words: int, page: int) -> list[Chunk]:
    """Split text into ≤size_words chunks with overlap_words overlap.

    Each chunk ends at the last sentence boundary in the final 20% of the
    `size_words` window when possible; otherwise it ends at exactly `size_words`.
    """
    if not text.strip():
        return []

    words = text.split()
    if len(words) <= size_words:
        return [Chunk(text=" ".join(words), page=page)]

    chunks: list[Chunk] = []
    start = 0
    lookback_window = max(1, int(size_words * 0.2))

    while start < len(words):
        hard_end = min(start + size_words, len(words))
        # Search for sentence boundary in last 20% of the window
        if hard_end - start == size_words:
            boundary_search_start = hard_end - lookback_window
            boundary = _find_sentence_boundary(words[: hard_end], boundary_search_start)
            end = boundary + 1 if boundary is not None else hard_end
        else:
            end = hard_end

        chunks.append(Chunk(text=" ".join(words[start:end]), page=page))

        if end >= len(words):
            break

        # Step forward: end - overlap, but never less than start + 1
        next_start = max(end - overlap_words, start + 1)
        start = next_start

    return chunks
  • Step 4: Run tests to verify they pass
uv run pytest tests/test_chunker.py -v

Expected: 6 PASS.

  • Step 5: Commit
git add app/ingest/chunker.py tests/test_chunker.py
git commit -m "feat: word-based chunker mit sentence-boundary look-back"

Task 6: Extractors + Test Fixtures

Files:

  • Create: app/ingest/extractors.py

  • Create: tests/conftest.py

  • Test: tests/test_extractors.py

  • Step 1: Create test fixtures via conftest

Sample files are generated dynamically so we don't commit binaries.

# tests/conftest.py
import io
import pytest
import fitz  # pymupdf
from docx import Document


@pytest.fixture
def sample_pdf_bytes() -> bytes:
    doc = fitz.open()
    p1 = doc.new_page()
    p1.insert_text((72, 72), "Seite eins enthaelt Lorem Ipsum.")
    p2 = doc.new_page()
    p2.insert_text((72, 72), "Seite zwei enthaelt mehr Text.")
    data = doc.tobytes()
    doc.close()
    return data


@pytest.fixture
def sample_docx_bytes() -> bytes:
    doc = Document()
    doc.add_paragraph("Erster Absatz.")
    doc.add_paragraph("Zweiter Absatz mit mehr Inhalt.")
    buf = io.BytesIO()
    doc.save(buf)
    return buf.getvalue()


@pytest.fixture
def sample_md_bytes() -> bytes:
    return "# Title\n\nFirst paragraph.\n\nSecond paragraph.\n".encode("utf-8")


@pytest.fixture
def sample_xlsx_bytes() -> bytes:
    # Minimal placeholder; extractor doesn't read content for xlsx
    return b"PK\x03\x04dummy"
  • Step 2: Write the failing tests
# tests/test_extractors.py
import pytest
from app.ingest.extractors import extract, ExtractedPage, UnsupportedFileType


def test_extract_pdf_returns_pages(sample_pdf_bytes):
    pages = extract(sample_pdf_bytes, "pdf", filename="x.pdf")
    assert len(pages) == 2
    assert pages[0].page == 1
    assert "Seite eins" in pages[0].text
    assert pages[1].page == 2
    assert "Seite zwei" in pages[1].text


def test_extract_docx_returns_single_page(sample_docx_bytes):
    pages = extract(sample_docx_bytes, "docx", filename="x.docx")
    assert len(pages) == 1
    assert pages[0].page == 1
    assert "Erster Absatz." in pages[0].text
    assert "Zweiter Absatz" in pages[0].text


def test_extract_md_returns_single_page(sample_md_bytes):
    pages = extract(sample_md_bytes, "md", filename="x.md")
    assert len(pages) == 1
    assert pages[0].page == 1
    assert "First paragraph." in pages[0].text


def test_extract_xlsx_returns_filename_pseudo_text(sample_xlsx_bytes):
    pages = extract(sample_xlsx_bytes, "xlsx", filename="my-sheet.xlsx")
    assert len(pages) == 1
    assert pages[0].page == 1
    assert pages[0].text == "Tabelle: my-sheet.xlsx"


def test_extract_unsupported_raises():
    with pytest.raises(UnsupportedFileType):
        extract(b"data", "txt", filename="x.txt")
  • Step 3: Run tests to verify they fail
uv run pytest tests/test_extractors.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 4: Implement app/ingest/extractors.py
import io
from dataclasses import dataclass

import fitz  # pymupdf
from docx import Document


SUPPORTED_TYPES = {"pdf", "md", "docx", "xlsx"}


class UnsupportedFileType(Exception):
    pass


@dataclass(frozen=True)
class ExtractedPage:
    page: int
    text: str


def extract(data: bytes, file_type: str, filename: str) -> list[ExtractedPage]:
    file_type = file_type.lower().lstrip(".")
    if file_type not in SUPPORTED_TYPES:
        raise UnsupportedFileType(file_type)

    if file_type == "pdf":
        return _extract_pdf(data)
    if file_type == "md":
        return _extract_md(data)
    if file_type == "docx":
        return _extract_docx(data)
    if file_type == "xlsx":
        return [ExtractedPage(page=1, text=f"Tabelle: {filename}")]

    raise UnsupportedFileType(file_type)  # unreachable


def _extract_pdf(data: bytes) -> list[ExtractedPage]:
    pages: list[ExtractedPage] = []
    with fitz.open(stream=data, filetype="pdf") as doc:
        for i, page in enumerate(doc, start=1):
            pages.append(ExtractedPage(page=i, text=page.get_text() or ""))
    return pages


def _extract_md(data: bytes) -> list[ExtractedPage]:
    return [ExtractedPage(page=1, text=data.decode("utf-8", errors="replace"))]


def _extract_docx(data: bytes) -> list[ExtractedPage]:
    doc = Document(io.BytesIO(data))
    text = "\n\n".join(p.text for p in doc.paragraphs if p.text)
    return [ExtractedPage(page=1, text=text)]
  • Step 5: Run tests to verify they pass
uv run pytest tests/test_extractors.py -v

Expected: 5 PASS.

  • Step 6: Commit
git add app/ingest/extractors.py tests/conftest.py tests/test_extractors.py
git commit -m "feat: extractors fuer pdf/md/docx/xlsx mit dynamic fixtures"

Task 7: WebDAV Client

Files:

  • Create: app/ingest/webdav.py

  • Test: tests/test_webdav.py

  • Step 1: Write the failing tests

# tests/test_webdav.py
import pytest
import httpx
import respx

from app.ingest.webdav import download_file, WebDAVError


@pytest.mark.asyncio
async def test_download_file_returns_bytes():
    base = "https://nc.example.com/remote.php/dav/files/u"
    file_path = "Documents/THB/Studium/2.Semester/Databases/x.pdf"
    expected_url = f"{base}/{file_path}"

    with respx.mock(base_url=base) as mock:
        mock.get(expected_url).mock(return_value=httpx.Response(200, content=b"PDFBYTES"))
        data = await download_file(base, "u", "pw", file_path)
        assert data == b"PDFBYTES"


@pytest.mark.asyncio
async def test_download_file_404_raises():
    base = "https://nc.example.com/remote.php/dav/files/u"
    file_path = "missing.pdf"
    expected_url = f"{base}/{file_path}"

    with respx.mock(base_url=base) as mock:
        mock.get(expected_url).mock(return_value=httpx.Response(404))
        with pytest.raises(WebDAVError):
            await download_file(base, "u", "pw", file_path)


@pytest.mark.asyncio
async def test_download_file_handles_url_encoding():
    base = "https://nc.example.com/remote.php/dav/files/u"
    file_path = "Documents/Folder With Space/file.pdf"

    with respx.mock(base_url=base) as mock:
        # httpx will percent-encode the spaces
        route = mock.get(url__regex=r".*/Folder%20With%20Space/file\.pdf").mock(
            return_value=httpx.Response(200, content=b"OK")
        )
        data = await download_file(base, "u", "pw", file_path)
        assert data == b"OK"
        assert route.called
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_webdav.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 3: Implement app/ingest/webdav.py
import httpx


class WebDAVError(Exception):
    pass


async def download_file(base_url: str, user: str, password: str, file_path: str, *, timeout: float = 60.0) -> bytes:
    """Fetch a file from Nextcloud WebDAV. Returns the raw bytes."""
    base = base_url.rstrip("/")
    rel = file_path.lstrip("/")
    url = f"{base}/{rel}"

    async with httpx.AsyncClient(auth=(user, password), timeout=timeout) as client:
        response = await client.get(url)

    if response.status_code != 200:
        raise WebDAVError(
            f"WebDAV GET {file_path} failed: status={response.status_code}"
        )
    return response.content
  • Step 4: Run tests to verify they pass
uv run pytest tests/test_webdav.py -v

Expected: 3 PASS.

  • Step 5: Commit
git add app/ingest/webdav.py tests/test_webdav.py
git commit -m "feat: webdav download via httpx mit basic-auth"

Task 8: Ollama Embedder

Files:

  • Create: app/ingest/embedder.py

  • Test: tests/test_embedder.py

  • Step 1: Write the failing tests

# tests/test_embedder.py
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest

from app.ingest.embedder import embed_texts, EmbeddingError


@pytest.mark.asyncio
async def test_embed_texts_returns_vectors():
    fake_client = MagicMock()
    fake_client.embeddings = AsyncMock(side_effect=[
        {"embedding": [0.1, 0.2, 0.3]},
        {"embedding": [0.4, 0.5, 0.6]},
    ])

    with patch("app.ingest.embedder._client", return_value=fake_client):
        vectors = await embed_texts(["hello", "world"], model="qwen3-embedding:0.6b")

    assert vectors == [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]
    assert fake_client.embeddings.call_count == 2


@pytest.mark.asyncio
async def test_embed_texts_retries_on_failure(monkeypatch):
    monkeypatch.setattr("app.ingest.embedder._BACKOFF_SECONDS", [0, 0, 0])

    fake_client = MagicMock()
    fake_client.embeddings = AsyncMock(side_effect=[
        Exception("connection refused"),
        Exception("timeout"),
        {"embedding": [1.0, 2.0]},
    ])

    with patch("app.ingest.embedder._client", return_value=fake_client):
        vectors = await embed_texts(["hi"], model="m")

    assert vectors == [[1.0, 2.0]]
    assert fake_client.embeddings.call_count == 3


@pytest.mark.asyncio
async def test_embed_texts_raises_after_max_retries(monkeypatch):
    monkeypatch.setattr("app.ingest.embedder._BACKOFF_SECONDS", [0, 0, 0])

    fake_client = MagicMock()
    fake_client.embeddings = AsyncMock(side_effect=Exception("nope"))

    with patch("app.ingest.embedder._client", return_value=fake_client):
        with pytest.raises(EmbeddingError):
            await embed_texts(["hi"], model="m")

    assert fake_client.embeddings.call_count == 4  # initial + 3 retries
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_embedder.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 3: Implement app/ingest/embedder.py
import asyncio
import logging
from functools import lru_cache

from ollama import AsyncClient

from app.config import get_settings


logger = logging.getLogger(__name__)


_BACKOFF_SECONDS = [1, 2, 4]


class EmbeddingError(Exception):
    pass


@lru_cache(maxsize=1)
def _client() -> AsyncClient:
    return AsyncClient(host=get_settings().ollama_url)


async def embed_texts(texts: list[str], model: str) -> list[list[float]]:
    """Embed each text via Ollama. Retries individual calls 3x with backoff."""
    vectors: list[list[float]] = []
    for text in texts:
        vec = await _embed_one(text, model)
        vectors.append(vec)
    return vectors


async def _embed_one(text: str, model: str) -> list[float]:
    last_err: Exception | None = None
    client = _client()
    for attempt in range(len(_BACKOFF_SECONDS) + 1):
        try:
            response = await client.embeddings(model=model, prompt=text)
            return list(response["embedding"])
        except Exception as exc:
            last_err = exc
            if attempt < len(_BACKOFF_SECONDS):
                wait = _BACKOFF_SECONDS[attempt]
                logger.warning(
                    "ollama embed retry",
                    extra={"event": "embed_retry", "attempt": attempt + 1, "wait_s": wait, "error": str(exc)},
                )
                await asyncio.sleep(wait)
    raise EmbeddingError(f"embed failed after retries: {last_err}")


async def embedding_dimension(model: str) -> int:
    """Probe a single embedding to discover the model's vector dimension."""
    vec = await _embed_one("dimension probe", model)
    return len(vec)
  • Step 4: Run tests to verify they pass
uv run pytest tests/test_embedder.py -v

Expected: 3 PASS.

  • Step 5: Commit
git add app/ingest/embedder.py tests/test_embedder.py
git commit -m "feat: ollama embedder mit exponential backoff retry"

Task 9: Qdrant Store

Files:

  • Create: app/qdrant_store.py

  • Test: tests/test_qdrant_store.py

  • Step 1: Write the failing tests

# tests/test_qdrant_store.py
from unittest.mock import MagicMock, patch
import pytest

from app.qdrant_store import (
    ensure_collection,
    upsert_chunks,
    delete_by_path,
    ChunkPoint,
)


def test_ensure_collection_creates_when_missing():
    fake_client = MagicMock()
    fake_client.collection_exists.return_value = False

    ensure_collection(fake_client, "rag_test", vector_size=1024)

    fake_client.create_collection.assert_called_once()
    args, kwargs = fake_client.create_collection.call_args
    assert kwargs["collection_name"] == "rag_test"
    # Payload indexes get created
    assert fake_client.create_payload_index.call_count == 3


def test_ensure_collection_skips_when_exists_with_matching_dim():
    fake_client = MagicMock()
    fake_client.collection_exists.return_value = True
    info = MagicMock()
    info.config.params.vectors.size = 1024
    fake_client.get_collection.return_value = info

    ensure_collection(fake_client, "rag_test", vector_size=1024)

    fake_client.create_collection.assert_not_called()


def test_ensure_collection_raises_on_dim_mismatch():
    fake_client = MagicMock()
    fake_client.collection_exists.return_value = True
    info = MagicMock()
    info.config.params.vectors.size = 768
    fake_client.get_collection.return_value = info

    with pytest.raises(RuntimeError, match="dimension mismatch"):
        ensure_collection(fake_client, "rag_test", vector_size=1024)


def test_upsert_chunks_calls_client_upsert():
    fake_client = MagicMock()
    points = [
        ChunkPoint(vector=[0.1] * 4, payload={"file_path": "a", "chunk_index": 0}),
        ChunkPoint(vector=[0.2] * 4, payload={"file_path": "a", "chunk_index": 1}),
    ]

    upsert_chunks(fake_client, "rag_test", points)

    fake_client.upsert.assert_called_once()
    kwargs = fake_client.upsert.call_args.kwargs
    assert kwargs["collection_name"] == "rag_test"
    assert len(kwargs["points"]) == 2


def test_delete_by_path_uses_filter():
    fake_client = MagicMock()
    delete_by_path(fake_client, "rag_test", "Documents/x.pdf")

    fake_client.delete.assert_called_once()
    kwargs = fake_client.delete.call_args.kwargs
    assert kwargs["collection_name"] == "rag_test"
    # The filter should target file_path
    selector = kwargs["points_selector"]
    # Inspect the FilterSelector → Filter → must → FieldCondition
    assert selector.filter.must[0].key == "file_path"
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_qdrant_store.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 3: Implement app/qdrant_store.py
import uuid
from dataclasses import dataclass
from typing import Any

from qdrant_client import QdrantClient
from qdrant_client.http import models as qm


@dataclass(frozen=True)
class ChunkPoint:
    vector: list[float]
    payload: dict[str, Any]


def ensure_collection(client: QdrantClient, name: str, vector_size: int) -> None:
    """Create the collection if missing. Crash if it exists with wrong dim."""
    if not client.collection_exists(name):
        client.create_collection(
            collection_name=name,
            vectors_config=qm.VectorParams(size=vector_size, distance=qm.Distance.COSINE),
        )
        for field in ("file_path", "semester", "fach"):
            client.create_payload_index(
                collection_name=name,
                field_name=field,
                field_schema=qm.PayloadSchemaType.KEYWORD,
            )
        return

    info = client.get_collection(name)
    existing = info.config.params.vectors.size
    if existing != vector_size:
        raise RuntimeError(
            f"qdrant collection '{name}' dimension mismatch: "
            f"existing={existing}, model={vector_size}. "
            "Drop the collection manually and run a bulk import."
        )


def upsert_chunks(client: QdrantClient, name: str, chunks: list[ChunkPoint]) -> None:
    points = [
        qm.PointStruct(id=str(uuid.uuid4()), vector=c.vector, payload=c.payload)
        for c in chunks
    ]
    client.upsert(collection_name=name, points=points)


def delete_by_path(client: QdrantClient, name: str, file_path: str) -> None:
    selector = qm.FilterSelector(
        filter=qm.Filter(
            must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))]
        )
    )
    client.delete(collection_name=name, points_selector=selector)
  • Step 4: Run tests to verify they pass
uv run pytest tests/test_qdrant_store.py -v

Expected: 5 PASS.

  • Step 5: Commit
git add app/qdrant_store.py tests/test_qdrant_store.py
git commit -m "feat: qdrant store mit ensure/upsert/delete-by-path"

Task 10: Webhook Models + Auth

Files:

  • Create: app/webhook/models.py

  • Create: app/webhook/auth.py

  • Test: tests/test_webhook.py (auth + model parts only here; full handler test in Task 12)

  • Step 1: Write the failing tests

# tests/test_webhook.py
import pytest
from fastapi import HTTPException

from app.webhook.models import NextcloudEvent, EventType
from app.webhook.auth import verify_secret


def test_event_parses_created():
    evt = NextcloudEvent(event_type="created", file_path="a/b.pdf", file_name="b.pdf")
    assert evt.event_type == EventType.CREATED


def test_event_invalid_type_raises():
    with pytest.raises(Exception):
        NextcloudEvent(event_type="exploded", file_path="a", file_name="a")


def test_verify_secret_pass():
    verify_secret(provided="abc", expected="abc")  # no exception


def test_verify_secret_fail():
    with pytest.raises(HTTPException) as exc_info:
        verify_secret(provided="wrong", expected="abc")
    assert exc_info.value.status_code == 401


def test_verify_secret_missing_fail():
    with pytest.raises(HTTPException) as exc_info:
        verify_secret(provided=None, expected="abc")
    assert exc_info.value.status_code == 401
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_webhook.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 3: Implement app/webhook/models.py
from enum import Enum
from pydantic import BaseModel


class EventType(str, Enum):
    CREATED = "created"
    UPDATED = "updated"
    DELETED = "deleted"


class NextcloudEvent(BaseModel):
    event_type: EventType
    file_path: str
    file_name: str
  • Step 4: Implement app/webhook/auth.py
import hmac

from fastapi import HTTPException


def verify_secret(provided: str | None, expected: str) -> None:
    """Constant-time comparison of the shared secret.

    Raises HTTPException(401) on mismatch or missing header.
    """
    if provided is None or not hmac.compare_digest(provided, expected):
        raise HTTPException(status_code=401, detail="invalid or missing secret")
  • Step 5: Run tests to verify they pass
uv run pytest tests/test_webhook.py -v

Expected: 5 PASS.

  • Step 6: Commit
git add app/webhook/models.py app/webhook/auth.py tests/test_webhook.py
git commit -m "feat: webhook event-model und shared-secret auth"

Task 11: Pipeline Orchestration

Files:

  • Create: app/ingest/pipeline.py

  • Test: tests/test_pipeline.py

  • Step 1: Write the failing tests

# tests/test_pipeline.py
from unittest.mock import AsyncMock, MagicMock, patch
import pytest

from app.webhook.models import EventType
from app.ingest.pipeline import process_file


@pytest.mark.asyncio
async def test_process_deleted_event_calls_delete_only(monkeypatch):
    qdrant = MagicMock()
    monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)

    download_mock = AsyncMock()
    monkeypatch.setattr("app.ingest.pipeline.download_file", download_mock)

    await process_file(
        file_path="Documents/THB/Studium/2.Semester/Databases/x.pdf",
        event_type=EventType.DELETED,
    )

    download_mock.assert_not_called()
    qdrant.delete.assert_called_once()


@pytest.mark.asyncio
async def test_process_outside_root_skips(monkeypatch):
    qdrant = MagicMock()
    monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)

    download_mock = AsyncMock()
    monkeypatch.setattr("app.ingest.pipeline.download_file", download_mock)

    await process_file(
        file_path="Documents/Other/x.pdf",
        event_type=EventType.CREATED,
    )

    download_mock.assert_not_called()
    qdrant.delete.assert_not_called()
    qdrant.upsert.assert_not_called()


@pytest.mark.asyncio
async def test_process_unsupported_extension_skips(monkeypatch):
    qdrant = MagicMock()
    monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)
    monkeypatch.setattr("app.ingest.pipeline.download_file", AsyncMock())

    await process_file(
        file_path="Documents/THB/Studium/2.Semester/Databases/notes.txt",
        event_type=EventType.CREATED,
    )

    qdrant.upsert.assert_not_called()


@pytest.mark.asyncio
async def test_process_created_full_flow(monkeypatch, sample_pdf_bytes):
    qdrant = MagicMock()
    monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)

    monkeypatch.setattr(
        "app.ingest.pipeline.download_file",
        AsyncMock(return_value=sample_pdf_bytes),
    )
    monkeypatch.setattr(
        "app.ingest.pipeline.embed_texts",
        AsyncMock(return_value=[[0.1] * 4, [0.2] * 4]),
    )

    await process_file(
        file_path="Documents/THB/Studium/2.Semester/Databases/Vorlesungen/x.pdf",
        event_type=EventType.CREATED,
    )

    # delete called first (idempotency), upsert called after
    qdrant.delete.assert_called_once()
    qdrant.upsert.assert_called_once()

    upserted_points = qdrant.upsert.call_args.kwargs["points"]
    assert len(upserted_points) >= 1
    payload = upserted_points[0].payload
    assert payload["semester"] == "2.Semester"
    assert payload["fach"] == "Databases"
    assert payload["typ"] == "Vorlesungen"
    assert payload["file_type"] == "pdf"
    assert payload["chunk_index"] == 0
    assert "ingested_at" in payload
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_pipeline.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 3: Implement app/ingest/pipeline.py
import logging
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import PurePosixPath

from qdrant_client import QdrantClient

from app.config import get_settings
from app.ingest.chunker import chunk_text
from app.ingest.embedder import embed_texts
from app.ingest.extractors import extract, UnsupportedFileType, SUPPORTED_TYPES
from app.ingest.metadata import parse_path
from app.ingest.webdav import download_file
from app.qdrant_store import upsert_chunks, delete_by_path, ChunkPoint
from app.webhook.models import EventType


logger = logging.getLogger(__name__)


@lru_cache(maxsize=1)
def _qdrant_client() -> QdrantClient:
    return QdrantClient(url=get_settings().qdrant_url)


async def process_file(file_path: str, event_type: EventType) -> None:
    """End-to-end pipeline for one file event."""
    settings = get_settings()
    file_path = file_path.lstrip("/")

    metadata = parse_path(file_path, settings.ingest_root)
    if metadata is None:
        logger.info(
            "skip outside ingest root",
            extra={"event": "skip", "reason": "outside_root", "file": file_path},
        )
        return

    if event_type == EventType.DELETED:
        delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
        logger.info("deleted", extra={"event": "delete", "file": file_path})
        return

    extension = PurePosixPath(file_path).suffix.lstrip(".").lower()
    if extension not in SUPPORTED_TYPES:
        logger.info(
            "skip unsupported type",
            extra={"event": "skip", "reason": "unsupported_type", "file": file_path, "ext": extension},
        )
        return

    try:
        data = await download_file(
            settings.nextcloud_webdav_url,
            settings.nextcloud_user,
            settings.nextcloud_app_password,
            file_path,
        )
    except Exception as exc:
        logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)})
        return

    try:
        pages = extract(data, extension, filename=PurePosixPath(file_path).name)
    except UnsupportedFileType:
        logger.info("unsupported", extra={"event": "skip", "file": file_path})
        return
    except Exception as exc:
        logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)})
        return

    chunks: list[tuple[str, int, int]] = []  # (text, page, chunk_index)
    chunk_index = 0
    for page in pages:
        for chunk in chunk_text(
            page.text,
            size_words=settings.chunk_size_words,
            overlap_words=settings.chunk_overlap_words,
            page=page.page,
        ):
            chunks.append((chunk.text, chunk.page, chunk_index))
            chunk_index += 1

    if not chunks:
        logger.info("no chunks", extra={"event": "skip", "reason": "empty_text", "file": file_path})
        # Still delete any prior data for this path
        delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
        return

    try:
        vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model)
    except Exception as exc:
        logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)})
        return

    now_iso = datetime.now(timezone.utc).isoformat()
    file_name = PurePosixPath(file_path).name
    points = [
        ChunkPoint(
            vector=vec,
            payload={
                "file_path": file_path,
                "file_name": file_name,
                "file_type": extension,
                "semester": metadata.semester,
                "fach": metadata.fach,
                "typ": metadata.typ,
                "page": page,
                "chunk_index": idx,
                "text": text,
                "ingested_at": now_iso,
            },
        )
        for vec, (text, page, idx) in zip(vectors, chunks)
    ]

    qdrant = _qdrant_client()
    delete_by_path(qdrant, settings.qdrant_collection, file_path)
    upsert_chunks(qdrant, settings.qdrant_collection, points)

    logger.info(
        "ingested",
        extra={"event": "ingest_done", "file": file_path, "chunks": len(points)},
    )
  • Step 4: Run tests to verify they pass
uv run pytest tests/test_pipeline.py -v

Expected: 4 PASS.

  • Step 5: Commit
git add app/ingest/pipeline.py tests/test_pipeline.py
git commit -m "feat: pipeline-orchestrator fuer single-file ingest"

Task 12: FastAPI App + Webhook Handler

Files:

  • Create: app/main.py

  • Create: app/webhook/handler.py

  • Test: extend tests/test_webhook.py

  • Step 1: Add failing tests for the endpoint

Append to tests/test_webhook.py:

from unittest.mock import AsyncMock, MagicMock, patch
from fastapi.testclient import TestClient


def _make_app(monkeypatch):
    """Build the FastAPI app with all external clients stubbed."""
    monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc")
    monkeypatch.setenv("NEXTCLOUD_USER", "u")
    monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p")
    monkeypatch.setenv("OLLAMA_URL", "http://ollama")
    monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m")
    monkeypatch.setenv("QDRANT_URL", "http://qdrant")
    monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
    monkeypatch.setenv("WEBHOOK_SECRET", "abc")

    # Reset cached settings/clients
    import app.config
    app.config._settings = None
    import app.ingest.pipeline as pipe
    pipe._qdrant_client.cache_clear()

    # Stub the lifespan startup so it doesn't try to talk to real services
    monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock())

    from app.main import app
    return app


def test_health_endpoint_no_auth(monkeypatch):
    app = _make_app(monkeypatch)
    with TestClient(app) as client:
        r = client.get("/health")
    assert r.status_code == 200
    assert r.json() == {"status": "ok"}


def test_webhook_rejects_missing_secret(monkeypatch):
    app = _make_app(monkeypatch)
    with TestClient(app) as client:
        r = client.post("/webhook", json={
            "event_type": "created",
            "file_path": "a/b.pdf",
            "file_name": "b.pdf",
        })
    assert r.status_code == 401


def test_webhook_rejects_wrong_secret(monkeypatch):
    app = _make_app(monkeypatch)
    with TestClient(app) as client:
        r = client.post(
            "/webhook",
            json={"event_type": "created", "file_path": "a/b.pdf", "file_name": "b.pdf"},
            headers={"X-Webhook-Secret": "wrong"},
        )
    assert r.status_code == 401


def test_webhook_dispatches_background_task(monkeypatch):
    app = _make_app(monkeypatch)

    process_mock = AsyncMock()
    monkeypatch.setattr("app.webhook.handler.process_file", process_mock)

    with TestClient(app) as client:
        r = client.post(
            "/webhook",
            json={
                "event_type": "created",
                "file_path": "Documents/THB/Studium/2.Semester/Databases/x.pdf",
                "file_name": "x.pdf",
            },
            headers={"X-Webhook-Secret": "abc"},
        )

    assert r.status_code == 202
    process_mock.assert_awaited_once()
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_webhook.py -v

Expected: FAIL on the new endpoint tests with import errors.

  • Step 3: Implement app/webhook/handler.py
from fastapi import APIRouter, BackgroundTasks, Header, status

from app.config import get_settings
from app.ingest.pipeline import process_file
from app.webhook.auth import verify_secret
from app.webhook.models import NextcloudEvent


router = APIRouter()


@router.post("/webhook", status_code=status.HTTP_202_ACCEPTED)
async def webhook(
    event: NextcloudEvent,
    background: BackgroundTasks,
    x_webhook_secret: str | None = Header(default=None),
):
    verify_secret(x_webhook_secret, get_settings().webhook_secret)
    background.add_task(process_file, event.file_path, event.event_type)
    return {"status": "accepted"}
  • Step 4: Implement app/main.py
import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI
from qdrant_client import QdrantClient

from app.config import get_settings
from app.ingest.embedder import embedding_dimension
from app.logging_setup import setup_logging
from app.qdrant_store import ensure_collection
from app.webhook.handler import router as webhook_router


logger = logging.getLogger(__name__)


async def _startup_ensure_collection() -> None:
    settings = get_settings()
    dim = await embedding_dimension(settings.ollama_embed_model)
    client = QdrantClient(url=settings.qdrant_url)
    ensure_collection(client, settings.qdrant_collection, vector_size=dim)
    logger.info(
        "qdrant collection ready",
        extra={"event": "startup", "collection": settings.qdrant_collection, "dim": dim},
    )


@asynccontextmanager
async def lifespan(app: FastAPI):
    setup_logging(get_settings().log_level)
    await _startup_ensure_collection()
    yield


app = FastAPI(title="rag-ingestor", lifespan=lifespan)
app.include_router(webhook_router)


@app.get("/health")
async def health():
    return {"status": "ok"}
  • Step 5: Run tests to verify they pass
uv run pytest tests/test_webhook.py -v

Expected: 9 PASS (5 from earlier + 4 new).

  • Step 6: Commit
git add app/main.py app/webhook/handler.py tests/test_webhook.py
git commit -m "feat: fastapi app mit lifespan, webhook handler und /health"

Task 13: Bulk Import Endpoint

Files:

  • Create: app/bulk.py
  • Modify: app/main.py (register router)
  • Test: tests/test_bulk.py

The bulk endpoint walks a folder via WebDAV PROPFIND, then dispatches one background task per supported file.

  • Step 1: Write the failing tests
# tests/test_bulk.py
from unittest.mock import AsyncMock
from fastapi.testclient import TestClient


def _make_app(monkeypatch):
    monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc")
    monkeypatch.setenv("NEXTCLOUD_USER", "u")
    monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p")
    monkeypatch.setenv("OLLAMA_URL", "http://ollama")
    monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m")
    monkeypatch.setenv("QDRANT_URL", "http://qdrant")
    monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
    monkeypatch.setenv("WEBHOOK_SECRET", "abc")
    import app.config
    app.config._settings = None
    monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock())
    from app.main import app
    return app


def test_bulk_import_lists_and_dispatches(monkeypatch):
    app = _make_app(monkeypatch)

    listed = [
        "Documents/THB/Studium/2.Semester/Databases/a.pdf",
        "Documents/THB/Studium/2.Semester/Databases/b.docx",
        "Documents/THB/Studium/2.Semester/Databases/.rag-meta.json",  # ignored
    ]
    monkeypatch.setattr("app.bulk.list_files_recursive", AsyncMock(return_value=listed))

    process_mock = AsyncMock()
    monkeypatch.setattr("app.bulk.process_file", process_mock)

    with TestClient(app) as client:
        r = client.post(
            "/bulk-import",
            json={"path": "Documents/THB/Studium/2.Semester/Databases"},
            headers={"X-Webhook-Secret": "abc"},
        )

    assert r.status_code == 202
    body = r.json()
    assert body["dispatched"] == 2  # only .pdf and .docx, not the json sidecar
    # Two background tasks queued (verified after client context exits)


def test_bulk_import_rejects_wrong_secret(monkeypatch):
    app = _make_app(monkeypatch)
    with TestClient(app) as client:
        r = client.post("/bulk-import", json={"path": "x"}, headers={"X-Webhook-Secret": "nope"})
    assert r.status_code == 401
  • Step 2: Run tests to verify they fail
uv run pytest tests/test_bulk.py -v

Expected: FAIL with ModuleNotFoundError.

  • Step 3: Implement app/bulk.py
import logging
import xml.etree.ElementTree as ET
from pathlib import PurePosixPath
from urllib.parse import unquote

import httpx
from fastapi import APIRouter, BackgroundTasks, Header, status
from pydantic import BaseModel

from app.config import get_settings
from app.ingest.extractors import SUPPORTED_TYPES
from app.ingest.pipeline import process_file
from app.webhook.auth import verify_secret
from app.webhook.models import EventType


logger = logging.getLogger(__name__)
router = APIRouter()


class BulkRequest(BaseModel):
    path: str


PROPFIND_BODY = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>
"""

DAV_NS = {"d": "DAV:"}


async def list_files_recursive(base_url: str, user: str, password: str, path: str) -> list[str]:
    """PROPFIND with Depth: infinity. Returns relative file paths (no folders)."""
    base = base_url.rstrip("/")
    rel = path.strip("/")
    url = f"{base}/{rel}"

    async with httpx.AsyncClient(auth=(user, password), timeout=120.0) as client:
        response = await client.request(
            "PROPFIND",
            url,
            headers={"Depth": "infinity", "Content-Type": "application/xml"},
            content=PROPFIND_BODY,
        )
    if response.status_code not in (200, 207):
        raise RuntimeError(f"PROPFIND failed: status={response.status_code}")

    root = ET.fromstring(response.text)
    base_path_segment = PurePosixPath(httpx.URL(base).path).as_posix()  # "/remote.php/dav/files/u"
    out: list[str] = []
    for resp in root.findall("d:response", DAV_NS):
        href = resp.findtext("d:href", default="", namespaces=DAV_NS)
        decoded = unquote(href)
        # Strip the WebDAV base prefix → leaves "Documents/.../file.pdf"
        if decoded.startswith(base_path_segment):
            decoded = decoded[len(base_path_segment):]
        decoded = decoded.lstrip("/")
        if not decoded or decoded.endswith("/"):
            continue
        # Skip directory entries (those have <d:collection/> resourcetype)
        rt = resp.find("d:propstat/d:prop/d:resourcetype/d:collection", DAV_NS)
        if rt is not None:
            continue
        out.append(decoded)
    return out


@router.post("/bulk-import", status_code=status.HTTP_202_ACCEPTED)
async def bulk_import(
    body: BulkRequest,
    background: BackgroundTasks,
    x_webhook_secret: str | None = Header(default=None),
):
    settings = get_settings()
    verify_secret(x_webhook_secret, settings.webhook_secret)

    files = await list_files_recursive(
        settings.nextcloud_webdav_url,
        settings.nextcloud_user,
        settings.nextcloud_app_password,
        body.path,
    )

    dispatched = 0
    for f in files:
        ext = PurePosixPath(f).suffix.lstrip(".").lower()
        if ext not in SUPPORTED_TYPES:
            continue
        background.add_task(process_file, f, EventType.CREATED)
        dispatched += 1

    logger.info(
        "bulk dispatch",
        extra={"event": "bulk_dispatch", "path": body.path, "dispatched": dispatched, "total_listed": len(files)},
    )
    return {"status": "accepted", "dispatched": dispatched}
  • Step 4: Modify app/main.py to include the bulk router

Edit app/main.py. After the from app.webhook.handler import ... line, add:

from app.bulk import router as bulk_router

After app.include_router(webhook_router), add:

app.include_router(bulk_router)
  • Step 5: Run tests to verify they pass
uv run pytest tests/test_bulk.py -v

Expected: 2 PASS.

  • Step 6: Run full test suite
uv run pytest -v

Expected: All tests pass (≈37 tests across 9 test files).

  • Step 7: Commit
git add app/bulk.py app/main.py tests/test_bulk.py
git commit -m "feat: bulk-import endpoint mit propfind walk"

Task 14: Dockerfile + Compose

Files:

  • Create: docker/Dockerfile

  • Create: docker-compose.yml

  • Step 1: Create docker/Dockerfile

FROM python:3.12-slim AS builder

RUN pip install --no-cache-dir uv

WORKDIR /app
COPY pyproject.toml uv.lock* ./
RUN uv sync --frozen --no-dev --no-install-project || uv sync --no-dev --no-install-project

COPY app ./app
RUN uv sync --frozen --no-dev || uv sync --no-dev


FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /app/.venv /app/.venv
COPY --from=builder /app/app /app/app

ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONUNBUFFERED=1

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
  • Step 2: Create docker-compose.yml (lokales Beispiel)
services:
  ingestor:
    build:
      context: .
      dockerfile: docker/Dockerfile
    env_file: .env
    ports:
      - "8000:8000"
    depends_on:
      - qdrant
      - ollama

  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
    volumes:
      - qdrant_data:/qdrant/storage

  ollama:
    image: ollama/ollama:latest
    ports:
      - "11434:11434"
    volumes:
      - ollama_data:/root/.ollama

volumes:
  qdrant_data:
  ollama_data:
  • Step 3: Build the image
docker build -f docker/Dockerfile -t rag-ingestor:dev .

Expected: Build succeeds.

  • Step 4: Commit
git add docker/Dockerfile docker-compose.yml
git commit -m "chore: dockerfile und compose-beispiel"

Task 15: README

Files:

  • Create: README.md

  • Step 1: Create README.md

# rag-ingestor

Microservice der Dateien aus Nextcloud (`Documents/THB/Studium/`) in Qdrant indexiert. Embeddings via Ollama.

## Endpoints

- `POST /webhook` (Header `X-Webhook-Secret`): Nextcloud-Event-Empfang (`created` / `updated` / `deleted`).
- `POST /bulk-import` (Header `X-Webhook-Secret`): Body `{"path": "..."}` → rekursiver Re-Index.
- `GET /health`: Liveness-Probe.

## Erwartete Ordnerstruktur

Documents/THB/Studium/.Semester//[]/


Unterstützte Dateitypen: `.pdf`, `.md`, `.docx`, `.xlsx` (XLSX wird nur als Filename indexiert, kein Inhalt).

## Konfiguration

Siehe `.env.example`. Alle Werte über Env-Vars, kein Config-File.

## Lokale Entwicklung

```bash
uv sync
uv run pytest
uv run uvicorn app.main:app --reload

Deployment

Image bauen und in Coolify neben Qdrant + Ollama deployen:

docker build -f docker/Dockerfile -t rag-ingestor .

Tests

uv run pytest -v

Tests deckt Pure-Logic ab (Metadata-Parser, Chunker, Extractors, Auth, Pipeline-Orchestrierung mit gemockten externen Services). Keine Integration-Tests gegen echte Ollama/Qdrant/WebDAV-Instanzen.


- [ ] **Step 2: Commit**

```bash
git add README.md
git commit -m "docs: readme mit endpoints, struktur und entwicklung"

Final Verification

  • Run the full test suite
uv run pytest -v

Expected: All tests green.

  • Smoke test the running service (requires running Qdrant + Ollama with the configured model)
cp .env.example .env
# Edit .env with real values
uv run uvicorn app.main:app

In another shell:

curl http://localhost:8000/health
# {"status":"ok"}
  • Verify push to develop branch is in good shape
git log --oneline
git status

Expected: clean working tree, sequence of feature commits ending with the README commit.