Compare commits
10 Commits
02c8f5d338
...
11acf0eb92
| Author | SHA1 | Date | |
|---|---|---|---|
| 11acf0eb92 | |||
| ca9ff55587 | |||
| 7fe2d853ec | |||
| 964b10dfe8 | |||
| ec94fe899b | |||
| a91150c41f | |||
| 8c50ab008c | |||
| fab5569955 | |||
| 4792f0277f | |||
| 61e00028e8 |
17
.dockerignore
Normal file
17
.dockerignore
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
.git
|
||||||
|
.venv
|
||||||
|
__pycache__
|
||||||
|
*.pyc
|
||||||
|
.pytest_cache
|
||||||
|
.ruff_cache
|
||||||
|
.mypy_cache
|
||||||
|
tests
|
||||||
|
docs
|
||||||
|
.env
|
||||||
|
.env.*
|
||||||
|
.idea
|
||||||
|
.vscode
|
||||||
|
*.egg-info
|
||||||
|
dist
|
||||||
|
build
|
||||||
|
.coverage
|
||||||
33
.gitea/workflows/ci.yml
Normal file
33
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
name: CI
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- '**'
|
||||||
|
tags-ignore:
|
||||||
|
- '**'
|
||||||
|
pull_request:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
ci:
|
||||||
|
runs-on: docker
|
||||||
|
container:
|
||||||
|
image: python:3.12-slim
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Install uv
|
||||||
|
run: pip install --no-cache-dir uv
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: uv sync --frozen
|
||||||
|
|
||||||
|
- name: Lint
|
||||||
|
run: uvx ruff check app tests
|
||||||
|
|
||||||
|
- name: Test
|
||||||
|
run: uv run pytest
|
||||||
|
|
||||||
|
- name: Build distribution
|
||||||
|
run: uv build
|
||||||
67
.gitea/workflows/release.yml
Normal file
67
.gitea/workflows/release.yml
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
name: Release
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
tags:
|
||||||
|
- 'v*'
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
release:
|
||||||
|
runs-on: dind
|
||||||
|
steps:
|
||||||
|
- name: Install tooling
|
||||||
|
run: apk add --no-cache git curl jq docker-cli docker-cli-buildx
|
||||||
|
|
||||||
|
- name: Clone repository at tag
|
||||||
|
run: |
|
||||||
|
git clone "https://${{ secrets.GITEA_TOKEN }}@gitea.jeanlucmakiola.de/${{ gitea.repository }}.git" repo
|
||||||
|
cd repo
|
||||||
|
git checkout "${{ gitea.ref_name }}"
|
||||||
|
|
||||||
|
- name: Compute previous tag and changelog
|
||||||
|
working-directory: repo
|
||||||
|
run: |
|
||||||
|
VERSION="${{ gitea.ref_name }}"
|
||||||
|
PREV_TAG=$(git tag -l 'v*' --sort=-v:refname | grep -v "^${VERSION}$" | head -n1 || true)
|
||||||
|
if [ -z "$PREV_TAG" ]; then
|
||||||
|
CHANGELOG=$(git log --pretty=format:"- %s" "${VERSION}")
|
||||||
|
else
|
||||||
|
CHANGELOG=$(git log --pretty=format:"- %s" "${PREV_TAG}..${VERSION}")
|
||||||
|
fi
|
||||||
|
{
|
||||||
|
echo "VERSION=$VERSION"
|
||||||
|
echo "PREV_TAG=$PREV_TAG"
|
||||||
|
echo "CHANGELOG<<CHANGELOG_EOF"
|
||||||
|
echo "$CHANGELOG"
|
||||||
|
echo "CHANGELOG_EOF"
|
||||||
|
} >> "$GITHUB_ENV"
|
||||||
|
|
||||||
|
- name: Log in to Gitea registry
|
||||||
|
run: |
|
||||||
|
echo "${{ secrets.REGISTRY_TOKEN }}" \
|
||||||
|
| docker login gitea.jeanlucmakiola.de -u "${{ gitea.repository_owner }}" --password-stdin
|
||||||
|
|
||||||
|
- name: Build and push Docker image
|
||||||
|
working-directory: repo
|
||||||
|
run: |
|
||||||
|
IMAGE="gitea.jeanlucmakiola.de/${{ gitea.repository }}"
|
||||||
|
docker buildx create --use --name builder >/dev/null 2>&1 || docker buildx use builder
|
||||||
|
docker buildx build \
|
||||||
|
--cache-from "type=registry,ref=${IMAGE}:buildcache" \
|
||||||
|
--cache-to "type=registry,ref=${IMAGE}:buildcache,mode=max" \
|
||||||
|
-f docker/Dockerfile \
|
||||||
|
-t "${IMAGE}:${VERSION}" \
|
||||||
|
-t "${IMAGE}:latest" \
|
||||||
|
--push .
|
||||||
|
|
||||||
|
- name: Create Gitea release
|
||||||
|
run: |
|
||||||
|
API_URL="${GITHUB_SERVER_URL}/api/v1/repos/${{ gitea.repository }}/releases"
|
||||||
|
curl -fsS -X POST "$API_URL" \
|
||||||
|
-H "Authorization: token ${{ secrets.GITEA_TOKEN }}" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d "$(jq -n \
|
||||||
|
--arg tag "$VERSION" \
|
||||||
|
--arg name "$VERSION" \
|
||||||
|
--arg body "$CHANGELOG" \
|
||||||
|
'{tag_name: $tag, name: $name, body: $body, draft: false, prerelease: false}')"
|
||||||
95
README.md
Normal file
95
README.md
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
# rag-ingestor
|
||||||
|
|
||||||
|
Microservice der Dateien aus Nextcloud (`Documents/THB/Studium/`) in Qdrant indexiert. Embeddings via Ollama.
|
||||||
|
|
||||||
|
## Endpoints
|
||||||
|
|
||||||
|
- `POST /webhook` (Header `X-Webhook-Secret`): Nextcloud-Event-Empfang (`created` / `updated` / `deleted`).
|
||||||
|
- `POST /bulk-import` (Header `X-Webhook-Secret`): Body `{"path": "..."}` → rekursiver Re-Index. Bulk-Pipeline-Stages laufen mit Concurrency 4 (siehe `BULK_CONCURRENCY` in `app/bulk.py`).
|
||||||
|
- `GET /health`: Liveness-Probe.
|
||||||
|
|
||||||
|
### Webhook-Payload-Format
|
||||||
|
|
||||||
|
Der Service erwartet ein vorgeformtes JSON. Nextcloud-Roh-Events werden **nicht** direkt akzeptiert — sie müssen via Flow-Webhook in dieses Schema übersetzt werden:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"event_type": "created",
|
||||||
|
"file_path": "Documents/THB/Studium/2.Semester/Databases/DBS1.pdf",
|
||||||
|
"file_name": "DBS1.pdf"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`event_type` ∈ `{"created", "updated", "deleted"}`. Auth via Header `X-Webhook-Secret`, der mit `WEBHOOK_SECRET` aus der Konfiguration übereinstimmen muss.
|
||||||
|
|
||||||
|
Beispielaufruf:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST http://localhost:8000/webhook \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Webhook-Secret: $WEBHOOK_SECRET" \
|
||||||
|
-d '{"event_type": "created", "file_path": "Documents/THB/Studium/2.Semester/Databases/DBS1.pdf", "file_name": "DBS1.pdf"}'
|
||||||
|
```
|
||||||
|
|
||||||
|
## Erwartete Ordnerstruktur
|
||||||
|
|
||||||
|
```
|
||||||
|
Documents/THB/Studium/<N>.Semester/<Fach>/[<Unterordner>]/<datei>
|
||||||
|
```
|
||||||
|
|
||||||
|
Unterstützte Dateitypen: `.pdf`, `.md`, `.docx`, `.xlsx` (XLSX wird nur als Filename indexiert, kein Inhalt).
|
||||||
|
|
||||||
|
## Konfiguration
|
||||||
|
|
||||||
|
Siehe `.env.example`. Alle Werte über Env-Vars, kein Config-File.
|
||||||
|
|
||||||
|
## Lokale Entwicklung
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv sync
|
||||||
|
uv run pytest
|
||||||
|
uv run uvicorn app.main:app --reload
|
||||||
|
```
|
||||||
|
|
||||||
|
## Deployment
|
||||||
|
|
||||||
|
Image bauen und in Coolify neben Qdrant + Ollama deployen:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker build -f docker/Dockerfile -t rag-ingestor .
|
||||||
|
```
|
||||||
|
|
||||||
|
## Tests
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv run pytest -v
|
||||||
|
```
|
||||||
|
|
||||||
|
Tests deckt Pure-Logic ab (Metadata-Parser, Chunker, Extractors, Auth, Pipeline-Orchestrierung mit gemockten externen Services). Keine Integration-Tests gegen echte Ollama/Qdrant/WebDAV-Instanzen.
|
||||||
|
|
||||||
|
## Recovery-Runbook
|
||||||
|
|
||||||
|
### Einbettungs-Modell oder -Dimension geändert
|
||||||
|
|
||||||
|
Beim Boot crasht der Service mit `qdrant collection ... dimension mismatch`, falls die existierende Collection eine andere Vektor-Dimension hat als das aktuelle Embedding-Modell. Dies ist Absicht (Fail-Fast). Vorgehen:
|
||||||
|
|
||||||
|
1. Collection in Qdrant manuell droppen:
|
||||||
|
```bash
|
||||||
|
curl -X DELETE "$QDRANT_URL/collections/$QDRANT_COLLECTION"
|
||||||
|
```
|
||||||
|
2. Service neu starten — Lifespan legt die Collection mit der neuen Dimension an.
|
||||||
|
3. Bulk-Import auf den Studium-Root anstoßen, um alle Inhalte neu zu indexieren:
|
||||||
|
```bash
|
||||||
|
curl -X POST http://localhost:8000/bulk-import \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Webhook-Secret: $WEBHOOK_SECRET" \
|
||||||
|
-d '{"path": "Documents/THB/Studium"}'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Webhook-Ausfall / fehlende In-Flight-Jobs nach Crash
|
||||||
|
|
||||||
|
Der Service hat keinen persistenten Job-Store; In-Flight-`BackgroundTask`s gehen bei Crash verloren. Recovery erfolgt über den Bulk-Import-Endpoint auf den betroffenen Pfad (siehe oben).
|
||||||
|
|
||||||
|
### Ein einzelnes File neu indexieren
|
||||||
|
|
||||||
|
Webhook mit `event_type: "updated"` an `/webhook` POSTen — alte Chunks werden via `delete_by_filter(file_path)` entfernt, dann frisch indexiert.
|
||||||
115
app/bulk.py
Normal file
115
app/bulk.py
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from pathlib import PurePosixPath
|
||||||
|
from urllib.parse import unquote
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from fastapi import APIRouter, BackgroundTasks, Header, HTTPException, status
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
from app.ingest.extractors import SUPPORTED_TYPES
|
||||||
|
from app.ingest.pipeline import process_file
|
||||||
|
from app.webhook.auth import verify_secret
|
||||||
|
from app.webhook.models import EventType
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
# Limits parallel pipeline execution during a bulk import so we don't
|
||||||
|
# saturate Ollama/WebDAV with thousands of concurrent requests.
|
||||||
|
BULK_CONCURRENCY = 4
|
||||||
|
_bulk_semaphore = asyncio.Semaphore(BULK_CONCURRENCY)
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_with_semaphore(file_path: str, event_type: EventType) -> None:
|
||||||
|
async with _bulk_semaphore:
|
||||||
|
await process_file(file_path, event_type)
|
||||||
|
|
||||||
|
|
||||||
|
class BulkRequest(BaseModel):
|
||||||
|
path: str = Field(min_length=1)
|
||||||
|
|
||||||
|
|
||||||
|
PROPFIND_BODY = """<?xml version="1.0"?>
|
||||||
|
<d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>
|
||||||
|
"""
|
||||||
|
|
||||||
|
DAV_NS = {"d": "DAV:"}
|
||||||
|
|
||||||
|
|
||||||
|
async def list_files_recursive(base_url: str, user: str, password: str, path: str) -> list[str]:
|
||||||
|
"""PROPFIND with Depth: infinity. Returns relative file paths (no folders)."""
|
||||||
|
base = base_url.rstrip("/")
|
||||||
|
rel = path.strip("/")
|
||||||
|
url = f"{base}/{rel}"
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(auth=(user, password), timeout=120.0) as client:
|
||||||
|
response = await client.request(
|
||||||
|
"PROPFIND",
|
||||||
|
url,
|
||||||
|
headers={"Depth": "infinity", "Content-Type": "application/xml"},
|
||||||
|
content=PROPFIND_BODY,
|
||||||
|
)
|
||||||
|
if response.status_code not in (200, 207):
|
||||||
|
raise RuntimeError(f"PROPFIND failed: status={response.status_code}")
|
||||||
|
|
||||||
|
root = ET.fromstring(response.text)
|
||||||
|
base_path_segment = PurePosixPath(httpx.URL(base).path).as_posix() # "/remote.php/dav/files/u"
|
||||||
|
out: list[str] = []
|
||||||
|
for resp in root.findall("d:response", DAV_NS):
|
||||||
|
href = resp.findtext("d:href", default="", namespaces=DAV_NS)
|
||||||
|
decoded = unquote(href)
|
||||||
|
# Strip the WebDAV base prefix → leaves "Documents/.../file.pdf"
|
||||||
|
if decoded.startswith(base_path_segment):
|
||||||
|
decoded = decoded[len(base_path_segment):]
|
||||||
|
decoded = decoded.lstrip("/")
|
||||||
|
if not decoded or decoded.endswith("/"):
|
||||||
|
continue
|
||||||
|
# Skip directory entries (those have <d:collection/> resourcetype)
|
||||||
|
rt = resp.find("d:propstat/d:prop/d:resourcetype/d:collection", DAV_NS)
|
||||||
|
if rt is not None:
|
||||||
|
continue
|
||||||
|
out.append(decoded)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/bulk-import", status_code=status.HTTP_202_ACCEPTED)
|
||||||
|
async def bulk_import(
|
||||||
|
body: BulkRequest,
|
||||||
|
background: BackgroundTasks,
|
||||||
|
x_webhook_secret: str | None = Header(default=None),
|
||||||
|
):
|
||||||
|
settings = get_settings()
|
||||||
|
verify_secret(x_webhook_secret, settings.webhook_secret)
|
||||||
|
|
||||||
|
try:
|
||||||
|
files = await list_files_recursive(
|
||||||
|
settings.nextcloud_webdav_url,
|
||||||
|
settings.nextcloud_user,
|
||||||
|
settings.nextcloud_app_password,
|
||||||
|
body.path,
|
||||||
|
)
|
||||||
|
except (RuntimeError, httpx.HTTPError, ET.ParseError) as exc:
|
||||||
|
logger.exception(
|
||||||
|
"bulk listing failed",
|
||||||
|
extra={"event": "bulk_listing_failed", "path": body.path, "error": str(exc)},
|
||||||
|
)
|
||||||
|
raise HTTPException(status_code=502, detail="webdav listing failed") from exc
|
||||||
|
|
||||||
|
dispatched = 0
|
||||||
|
for f in files:
|
||||||
|
ext = PurePosixPath(f).suffix.lstrip(".").lower()
|
||||||
|
if ext not in SUPPORTED_TYPES:
|
||||||
|
continue
|
||||||
|
background.add_task(_process_with_semaphore, f, EventType.CREATED)
|
||||||
|
dispatched += 1
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"bulk dispatch",
|
||||||
|
extra={"event": "bulk_dispatch", "path": body.path, "dispatched": dispatched, "total_listed": len(files)},
|
||||||
|
)
|
||||||
|
return {"status": "accepted", "dispatched": dispatched}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from pathlib import PurePosixPath
|
from pathlib import PurePosixPath
|
||||||
@@ -49,6 +50,7 @@ async def process_file(file_path: str, event_type: EventType) -> None:
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
t0 = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
data = await download_file(
|
data = await download_file(
|
||||||
settings.nextcloud_webdav_url,
|
settings.nextcloud_webdav_url,
|
||||||
@@ -59,7 +61,13 @@ async def process_file(file_path: str, event_type: EventType) -> None:
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)})
|
logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)})
|
||||||
return
|
return
|
||||||
|
download_ms = int((time.perf_counter() - t0) * 1000)
|
||||||
|
logger.info(
|
||||||
|
"download ok",
|
||||||
|
extra={"event": "download", "status": "ok", "file": file_path, "duration_ms": download_ms, "bytes": len(data)},
|
||||||
|
)
|
||||||
|
|
||||||
|
t0 = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
pages = extract(data, extension, filename=PurePosixPath(file_path).name)
|
pages = extract(data, extension, filename=PurePosixPath(file_path).name)
|
||||||
except UnsupportedFileType:
|
except UnsupportedFileType:
|
||||||
@@ -68,6 +76,11 @@ async def process_file(file_path: str, event_type: EventType) -> None:
|
|||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)})
|
logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)})
|
||||||
return
|
return
|
||||||
|
extract_ms = int((time.perf_counter() - t0) * 1000)
|
||||||
|
logger.info(
|
||||||
|
"extract ok",
|
||||||
|
extra={"event": "extract", "status": "ok", "file": file_path, "duration_ms": extract_ms, "pages": len(pages)},
|
||||||
|
)
|
||||||
|
|
||||||
chunks: list[tuple[str, int, int]] = [] # (text, page, chunk_index)
|
chunks: list[tuple[str, int, int]] = [] # (text, page, chunk_index)
|
||||||
chunk_index = 0
|
chunk_index = 0
|
||||||
@@ -87,11 +100,24 @@ async def process_file(file_path: str, event_type: EventType) -> None:
|
|||||||
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
|
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
t0 = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model)
|
vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)})
|
logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)})
|
||||||
return
|
return
|
||||||
|
embed_ms = int((time.perf_counter() - t0) * 1000)
|
||||||
|
logger.info(
|
||||||
|
"embed ok",
|
||||||
|
extra={"event": "embed", "status": "ok", "file": file_path, "duration_ms": embed_ms, "chunks": len(vectors)},
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(vectors) != len(chunks):
|
||||||
|
logger.error(
|
||||||
|
"vector/chunk count mismatch",
|
||||||
|
extra={"event": "embed_failed", "file": file_path, "vectors": len(vectors), "chunks": len(chunks)},
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
now_iso = datetime.now(timezone.utc).isoformat()
|
now_iso = datetime.now(timezone.utc).isoformat()
|
||||||
file_name = PurePosixPath(file_path).name
|
file_name = PurePosixPath(file_path).name
|
||||||
@@ -105,20 +131,28 @@ async def process_file(file_path: str, event_type: EventType) -> None:
|
|||||||
"semester": metadata.semester,
|
"semester": metadata.semester,
|
||||||
"fach": metadata.fach,
|
"fach": metadata.fach,
|
||||||
"typ": metadata.typ,
|
"typ": metadata.typ,
|
||||||
"page": page,
|
"page": page_num,
|
||||||
"chunk_index": idx,
|
"chunk_index": idx,
|
||||||
"text": text,
|
"text": text,
|
||||||
"ingested_at": now_iso,
|
"ingested_at": now_iso,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
for vec, (text, page, idx) in zip(vectors, chunks)
|
for vec, (text, page_num, idx) in zip(vectors, chunks)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
t0 = time.perf_counter()
|
||||||
qdrant = _qdrant_client()
|
qdrant = _qdrant_client()
|
||||||
delete_by_path(qdrant, settings.qdrant_collection, file_path)
|
delete_by_path(qdrant, settings.qdrant_collection, file_path)
|
||||||
upsert_chunks(qdrant, settings.qdrant_collection, points)
|
upsert_chunks(qdrant, settings.qdrant_collection, points)
|
||||||
|
qdrant_ms = int((time.perf_counter() - t0) * 1000)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"ingested",
|
"ingested",
|
||||||
extra={"event": "ingest_done", "file": file_path, "chunks": len(points)},
|
extra={
|
||||||
|
"event": "ingest_done",
|
||||||
|
"status": "ok",
|
||||||
|
"file": file_path,
|
||||||
|
"chunks": len(points),
|
||||||
|
"duration_ms": qdrant_ms,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
43
app/main.py
Normal file
43
app/main.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import logging
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from qdrant_client import QdrantClient
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
from app.ingest.embedder import embedding_dimension
|
||||||
|
from app.logging_setup import setup_logging
|
||||||
|
from app.qdrant_store import ensure_collection
|
||||||
|
from app.webhook.handler import router as webhook_router
|
||||||
|
from app.bulk import router as bulk_router
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def _startup_ensure_collection() -> None:
|
||||||
|
settings = get_settings()
|
||||||
|
dim = await embedding_dimension(settings.ollama_embed_model)
|
||||||
|
client = QdrantClient(url=settings.qdrant_url)
|
||||||
|
ensure_collection(client, settings.qdrant_collection, vector_size=dim)
|
||||||
|
logger.info(
|
||||||
|
"qdrant collection ready",
|
||||||
|
extra={"event": "startup", "collection": settings.qdrant_collection, "dim": dim},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
setup_logging(get_settings().log_level)
|
||||||
|
await _startup_ensure_collection()
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
app = FastAPI(title="rag-ingestor", lifespan=lifespan)
|
||||||
|
app.include_router(webhook_router)
|
||||||
|
app.include_router(bulk_router)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
async def health():
|
||||||
|
return {"status": "ok"}
|
||||||
20
app/webhook/handler.py
Normal file
20
app/webhook/handler.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
from fastapi import APIRouter, BackgroundTasks, Header, status
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
from app.ingest.pipeline import process_file
|
||||||
|
from app.webhook.auth import verify_secret
|
||||||
|
from app.webhook.models import NextcloudEvent
|
||||||
|
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/webhook", status_code=status.HTTP_202_ACCEPTED)
|
||||||
|
async def webhook(
|
||||||
|
event: NextcloudEvent,
|
||||||
|
background: BackgroundTasks,
|
||||||
|
x_webhook_secret: str | None = Header(default=None),
|
||||||
|
):
|
||||||
|
verify_secret(x_webhook_secret, get_settings().webhook_secret)
|
||||||
|
background.add_task(process_file, event.file_path, event.event_type)
|
||||||
|
return {"status": "accepted"}
|
||||||
33
docker-compose.yml
Normal file
33
docker-compose.yml
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
# Local development only.
|
||||||
|
# Production deployment goes via Coolify using docker/Dockerfile alone;
|
||||||
|
# the compose file here is for booting up qdrant + ollama next to the
|
||||||
|
# ingestor on a developer machine.
|
||||||
|
services:
|
||||||
|
ingestor:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: docker/Dockerfile
|
||||||
|
env_file: .env
|
||||||
|
ports:
|
||||||
|
- "8000:8000"
|
||||||
|
depends_on:
|
||||||
|
- qdrant
|
||||||
|
- ollama
|
||||||
|
|
||||||
|
qdrant:
|
||||||
|
image: qdrant/qdrant:latest
|
||||||
|
ports:
|
||||||
|
- "6333:6333"
|
||||||
|
volumes:
|
||||||
|
- qdrant_data:/qdrant/storage
|
||||||
|
|
||||||
|
ollama:
|
||||||
|
image: ollama/ollama:latest
|
||||||
|
ports:
|
||||||
|
- "11434:11434"
|
||||||
|
volumes:
|
||||||
|
- ollama_data:/root/.ollama
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
qdrant_data:
|
||||||
|
ollama_data:
|
||||||
24
docker/Dockerfile
Normal file
24
docker/Dockerfile
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
FROM python:3.12-slim AS builder
|
||||||
|
|
||||||
|
RUN pip install --no-cache-dir uv
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
COPY pyproject.toml uv.lock* ./
|
||||||
|
RUN uv sync --frozen --no-dev --no-install-project || uv sync --no-dev --no-install-project
|
||||||
|
|
||||||
|
COPY app ./app
|
||||||
|
RUN uv sync --frozen --no-dev || uv sync --no-dev
|
||||||
|
|
||||||
|
|
||||||
|
FROM python:3.12-slim
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
COPY --from=builder /app/.venv /app/.venv
|
||||||
|
COPY --from=builder /app/app /app/app
|
||||||
|
|
||||||
|
ENV PATH="/app/.venv/bin:$PATH"
|
||||||
|
ENV PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
|
EXPOSE 8000
|
||||||
|
|
||||||
|
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
|
||||||
64
tests/test_bulk.py
Normal file
64
tests/test_bulk.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
from unittest.mock import AsyncMock, call
|
||||||
|
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
from app.webhook.models import EventType
|
||||||
|
|
||||||
|
|
||||||
|
def _make_app(monkeypatch):
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc")
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_USER", "u")
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p")
|
||||||
|
monkeypatch.setenv("OLLAMA_URL", "http://ollama")
|
||||||
|
monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m")
|
||||||
|
monkeypatch.setenv("QDRANT_URL", "http://qdrant")
|
||||||
|
monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
|
||||||
|
monkeypatch.setenv("WEBHOOK_SECRET", "abc")
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
get_settings.cache_clear()
|
||||||
|
import app.ingest.pipeline as pipe
|
||||||
|
pipe._qdrant_client.cache_clear()
|
||||||
|
|
||||||
|
monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock())
|
||||||
|
from app.main import app
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
def test_bulk_import_lists_and_dispatches(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
|
||||||
|
listed = [
|
||||||
|
"Documents/THB/Studium/2.Semester/Databases/a.pdf",
|
||||||
|
"Documents/THB/Studium/2.Semester/Databases/b.docx",
|
||||||
|
"Documents/THB/Studium/2.Semester/Databases/.rag-meta.json", # ignored
|
||||||
|
]
|
||||||
|
monkeypatch.setattr("app.bulk.list_files_recursive", AsyncMock(return_value=listed))
|
||||||
|
|
||||||
|
process_mock = AsyncMock()
|
||||||
|
monkeypatch.setattr("app.bulk.process_file", process_mock)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.post(
|
||||||
|
"/bulk-import",
|
||||||
|
json={"path": "Documents/THB/Studium/2.Semester/Databases"},
|
||||||
|
headers={"X-Webhook-Secret": "abc"},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert r.status_code == 202
|
||||||
|
body = r.json()
|
||||||
|
assert body["dispatched"] == 2 # only .pdf and .docx, not the json sidecar
|
||||||
|
process_mock.assert_has_calls(
|
||||||
|
[
|
||||||
|
call("Documents/THB/Studium/2.Semester/Databases/a.pdf", EventType.CREATED),
|
||||||
|
call("Documents/THB/Studium/2.Semester/Databases/b.docx", EventType.CREATED),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
assert process_mock.await_count == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_bulk_import_rejects_wrong_secret(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.post("/bulk-import", json={"path": "x"}, headers={"X-Webhook-Secret": "nope"})
|
||||||
|
assert r.status_code == 401
|
||||||
@@ -1,5 +1,8 @@
|
|||||||
|
from unittest.mock import AsyncMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
from pydantic import ValidationError
|
from pydantic import ValidationError
|
||||||
|
|
||||||
from app.webhook.models import NextcloudEvent, EventType
|
from app.webhook.models import NextcloudEvent, EventType
|
||||||
@@ -38,3 +41,81 @@ def test_verify_secret_missing_fail():
|
|||||||
with pytest.raises(HTTPException) as exc_info:
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
verify_secret(provided=None, expected="abc")
|
verify_secret(provided=None, expected="abc")
|
||||||
assert exc_info.value.status_code == 401
|
assert exc_info.value.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def _make_app(monkeypatch):
|
||||||
|
"""Build the FastAPI app with all external clients stubbed."""
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc")
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_USER", "u")
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p")
|
||||||
|
monkeypatch.setenv("OLLAMA_URL", "http://ollama")
|
||||||
|
monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m")
|
||||||
|
monkeypatch.setenv("QDRANT_URL", "http://qdrant")
|
||||||
|
monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
|
||||||
|
monkeypatch.setenv("WEBHOOK_SECRET", "abc")
|
||||||
|
|
||||||
|
# Reset cached settings/clients
|
||||||
|
from app.config import get_settings
|
||||||
|
get_settings.cache_clear()
|
||||||
|
import app.ingest.pipeline as pipe
|
||||||
|
pipe._qdrant_client.cache_clear()
|
||||||
|
|
||||||
|
# Stub the lifespan startup so it doesn't try to talk to real services
|
||||||
|
monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock())
|
||||||
|
|
||||||
|
from app.main import app
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_endpoint_no_auth(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.get("/health")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json() == {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_webhook_rejects_missing_secret(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.post("/webhook", json={
|
||||||
|
"event_type": "created",
|
||||||
|
"file_path": "a/b.pdf",
|
||||||
|
"file_name": "b.pdf",
|
||||||
|
})
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_webhook_rejects_wrong_secret(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.post(
|
||||||
|
"/webhook",
|
||||||
|
json={"event_type": "created", "file_path": "a/b.pdf", "file_name": "b.pdf"},
|
||||||
|
headers={"X-Webhook-Secret": "wrong"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_webhook_dispatches_background_task(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
|
||||||
|
process_mock = AsyncMock()
|
||||||
|
monkeypatch.setattr("app.webhook.handler.process_file", process_mock)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.post(
|
||||||
|
"/webhook",
|
||||||
|
json={
|
||||||
|
"event_type": "created",
|
||||||
|
"file_path": "Documents/THB/Studium/2.Semester/Databases/x.pdf",
|
||||||
|
"file_name": "x.pdf",
|
||||||
|
},
|
||||||
|
headers={"X-Webhook-Secret": "abc"},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert r.status_code == 202
|
||||||
|
process_mock.assert_awaited_once_with(
|
||||||
|
"Documents/THB/Studium/2.Semester/Databases/x.pdf",
|
||||||
|
EventType.CREATED,
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user