diff --git a/README.md b/README.md index e522d2e..7fe9681 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,32 @@ Microservice der Dateien aus Nextcloud (`Documents/THB/Studium/`) in Qdrant inde ## Endpoints - `POST /webhook` (Header `X-Webhook-Secret`): Nextcloud-Event-Empfang (`created` / `updated` / `deleted`). -- `POST /bulk-import` (Header `X-Webhook-Secret`): Body `{"path": "..."}` → rekursiver Re-Index. +- `POST /bulk-import` (Header `X-Webhook-Secret`): Body `{"path": "..."}` → rekursiver Re-Index. Bulk-Pipeline-Stages laufen mit Concurrency 4 (siehe `BULK_CONCURRENCY` in `app/bulk.py`). - `GET /health`: Liveness-Probe. +### Webhook-Payload-Format + +Der Service erwartet ein vorgeformtes JSON. Nextcloud-Roh-Events werden **nicht** direkt akzeptiert — sie müssen via Flow-Webhook in dieses Schema übersetzt werden: + +```json +{ + "event_type": "created", + "file_path": "Documents/THB/Studium/2.Semester/Databases/DBS1.pdf", + "file_name": "DBS1.pdf" +} +``` + +`event_type` ∈ `{"created", "updated", "deleted"}`. Auth via Header `X-Webhook-Secret`, der mit `WEBHOOK_SECRET` aus der Konfiguration übereinstimmen muss. + +Beispielaufruf: + +```bash +curl -X POST http://localhost:8000/webhook \ + -H "Content-Type: application/json" \ + -H "X-Webhook-Secret: $WEBHOOK_SECRET" \ + -d '{"event_type": "created", "file_path": "Documents/THB/Studium/2.Semester/Databases/DBS1.pdf", "file_name": "DBS1.pdf"}' +``` + ## Erwartete Ordnerstruktur ``` @@ -43,3 +66,30 @@ uv run pytest -v ``` Tests deckt Pure-Logic ab (Metadata-Parser, Chunker, Extractors, Auth, Pipeline-Orchestrierung mit gemockten externen Services). Keine Integration-Tests gegen echte Ollama/Qdrant/WebDAV-Instanzen. + +## Recovery-Runbook + +### Einbettungs-Modell oder -Dimension geändert + +Beim Boot crasht der Service mit `qdrant collection ... dimension mismatch`, falls die existierende Collection eine andere Vektor-Dimension hat als das aktuelle Embedding-Modell. Dies ist Absicht (Fail-Fast). Vorgehen: + +1. Collection in Qdrant manuell droppen: + ```bash + curl -X DELETE "$QDRANT_URL/collections/$QDRANT_COLLECTION" + ``` +2. Service neu starten — Lifespan legt die Collection mit der neuen Dimension an. +3. Bulk-Import auf den Studium-Root anstoßen, um alle Inhalte neu zu indexieren: + ```bash + curl -X POST http://localhost:8000/bulk-import \ + -H "Content-Type: application/json" \ + -H "X-Webhook-Secret: $WEBHOOK_SECRET" \ + -d '{"path": "Documents/THB/Studium"}' + ``` + +### Webhook-Ausfall / fehlende In-Flight-Jobs nach Crash + +Der Service hat keinen persistenten Job-Store; In-Flight-`BackgroundTask`s gehen bei Crash verloren. Recovery erfolgt über den Bulk-Import-Endpoint auf den betroffenen Pfad (siehe oben). + +### Ein einzelnes File neu indexieren + +Webhook mit `event_type: "updated"` an `/webhook` POSTen — alte Chunks werden via `delete_by_filter(file_path)` entfernt, dann frisch indexiert. diff --git a/app/bulk.py b/app/bulk.py index ac4c3ae..3c7d2da 100644 --- a/app/bulk.py +++ b/app/bulk.py @@ -1,3 +1,4 @@ +import asyncio import logging import xml.etree.ElementTree as ET from pathlib import PurePosixPath @@ -18,6 +19,17 @@ logger = logging.getLogger(__name__) router = APIRouter() +# Limits parallel pipeline execution during a bulk import so we don't +# saturate Ollama/WebDAV with thousands of concurrent requests. +BULK_CONCURRENCY = 4 +_bulk_semaphore = asyncio.Semaphore(BULK_CONCURRENCY) + + +async def _process_with_semaphore(file_path: str, event_type: EventType) -> None: + async with _bulk_semaphore: + await process_file(file_path, event_type) + + class BulkRequest(BaseModel): path: str = Field(min_length=1) @@ -93,7 +105,7 @@ async def bulk_import( ext = PurePosixPath(f).suffix.lstrip(".").lower() if ext not in SUPPORTED_TYPES: continue - background.add_task(process_file, f, EventType.CREATED) + background.add_task(_process_with_semaphore, f, EventType.CREATED) dispatched += 1 logger.info( diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py index b78643d..73bfa2e 100644 --- a/app/ingest/pipeline.py +++ b/app/ingest/pipeline.py @@ -1,4 +1,5 @@ import logging +import time from datetime import datetime, timezone from functools import lru_cache from pathlib import PurePosixPath @@ -49,6 +50,7 @@ async def process_file(file_path: str, event_type: EventType) -> None: ) return + t0 = time.perf_counter() try: data = await download_file( settings.nextcloud_webdav_url, @@ -59,7 +61,13 @@ async def process_file(file_path: str, event_type: EventType) -> None: except Exception as exc: logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)}) return + download_ms = int((time.perf_counter() - t0) * 1000) + logger.info( + "download ok", + extra={"event": "download", "status": "ok", "file": file_path, "duration_ms": download_ms, "bytes": len(data)}, + ) + t0 = time.perf_counter() try: pages = extract(data, extension, filename=PurePosixPath(file_path).name) except UnsupportedFileType: @@ -68,6 +76,11 @@ async def process_file(file_path: str, event_type: EventType) -> None: except Exception as exc: logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)}) return + extract_ms = int((time.perf_counter() - t0) * 1000) + logger.info( + "extract ok", + extra={"event": "extract", "status": "ok", "file": file_path, "duration_ms": extract_ms, "pages": len(pages)}, + ) chunks: list[tuple[str, int, int]] = [] # (text, page, chunk_index) chunk_index = 0 @@ -87,11 +100,17 @@ async def process_file(file_path: str, event_type: EventType) -> None: delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path) return + t0 = time.perf_counter() try: vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model) except Exception as exc: logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)}) return + embed_ms = int((time.perf_counter() - t0) * 1000) + logger.info( + "embed ok", + extra={"event": "embed", "status": "ok", "file": file_path, "duration_ms": embed_ms, "chunks": len(vectors)}, + ) if len(vectors) != len(chunks): logger.error( @@ -121,11 +140,19 @@ async def process_file(file_path: str, event_type: EventType) -> None: for vec, (text, page_num, idx) in zip(vectors, chunks) ] + t0 = time.perf_counter() qdrant = _qdrant_client() delete_by_path(qdrant, settings.qdrant_collection, file_path) upsert_chunks(qdrant, settings.qdrant_collection, points) + qdrant_ms = int((time.perf_counter() - t0) * 1000) logger.info( "ingested", - extra={"event": "ingest_done", "file": file_path, "chunks": len(points)}, + extra={ + "event": "ingest_done", + "status": "ok", + "file": file_path, + "chunks": len(points), + "duration_ms": qdrant_ms, + }, )