diff --git a/app/bulk.py b/app/bulk.py new file mode 100644 index 0000000..1c65060 --- /dev/null +++ b/app/bulk.py @@ -0,0 +1,96 @@ +import logging +import xml.etree.ElementTree as ET +from pathlib import PurePosixPath +from urllib.parse import unquote + +import httpx +from fastapi import APIRouter, BackgroundTasks, Header, status +from pydantic import BaseModel + +from app.config import get_settings +from app.ingest.extractors import SUPPORTED_TYPES +from app.ingest.pipeline import process_file +from app.webhook.auth import verify_secret +from app.webhook.models import EventType + + +logger = logging.getLogger(__name__) +router = APIRouter() + + +class BulkRequest(BaseModel): + path: str + + +PROPFIND_BODY = """ + +""" + +DAV_NS = {"d": "DAV:"} + + +async def list_files_recursive(base_url: str, user: str, password: str, path: str) -> list[str]: + """PROPFIND with Depth: infinity. Returns relative file paths (no folders).""" + base = base_url.rstrip("/") + rel = path.strip("/") + url = f"{base}/{rel}" + + async with httpx.AsyncClient(auth=(user, password), timeout=120.0) as client: + response = await client.request( + "PROPFIND", + url, + headers={"Depth": "infinity", "Content-Type": "application/xml"}, + content=PROPFIND_BODY, + ) + if response.status_code not in (200, 207): + raise RuntimeError(f"PROPFIND failed: status={response.status_code}") + + root = ET.fromstring(response.text) + base_path_segment = PurePosixPath(httpx.URL(base).path).as_posix() # "/remote.php/dav/files/u" + out: list[str] = [] + for resp in root.findall("d:response", DAV_NS): + href = resp.findtext("d:href", default="", namespaces=DAV_NS) + decoded = unquote(href) + # Strip the WebDAV base prefix → leaves "Documents/.../file.pdf" + if decoded.startswith(base_path_segment): + decoded = decoded[len(base_path_segment):] + decoded = decoded.lstrip("/") + if not decoded or decoded.endswith("/"): + continue + # Skip directory entries (those have resourcetype) + rt = resp.find("d:propstat/d:prop/d:resourcetype/d:collection", DAV_NS) + if rt is not None: + continue + out.append(decoded) + return out + + +@router.post("/bulk-import", status_code=status.HTTP_202_ACCEPTED) +async def bulk_import( + body: BulkRequest, + background: BackgroundTasks, + x_webhook_secret: str | None = Header(default=None), +): + settings = get_settings() + verify_secret(x_webhook_secret, settings.webhook_secret) + + files = await list_files_recursive( + settings.nextcloud_webdav_url, + settings.nextcloud_user, + settings.nextcloud_app_password, + body.path, + ) + + dispatched = 0 + for f in files: + ext = PurePosixPath(f).suffix.lstrip(".").lower() + if ext not in SUPPORTED_TYPES: + continue + background.add_task(process_file, f, EventType.CREATED) + dispatched += 1 + + logger.info( + "bulk dispatch", + extra={"event": "bulk_dispatch", "path": body.path, "dispatched": dispatched, "total_listed": len(files)}, + ) + return {"status": "accepted", "dispatched": dispatched} diff --git a/app/main.py b/app/main.py index 963db63..a8b420a 100644 --- a/app/main.py +++ b/app/main.py @@ -9,6 +9,7 @@ from app.ingest.embedder import embedding_dimension from app.logging_setup import setup_logging from app.qdrant_store import ensure_collection from app.webhook.handler import router as webhook_router +from app.bulk import router as bulk_router logger = logging.getLogger(__name__) @@ -34,6 +35,7 @@ async def lifespan(app: FastAPI): app = FastAPI(title="rag-ingestor", lifespan=lifespan) app.include_router(webhook_router) +app.include_router(bulk_router) @app.get("/health") diff --git a/tests/test_bulk.py b/tests/test_bulk.py new file mode 100644 index 0000000..7972630 --- /dev/null +++ b/tests/test_bulk.py @@ -0,0 +1,54 @@ +from unittest.mock import AsyncMock +from fastapi.testclient import TestClient + + +def _make_app(monkeypatch): + monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc") + monkeypatch.setenv("NEXTCLOUD_USER", "u") + monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p") + monkeypatch.setenv("OLLAMA_URL", "http://ollama") + monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m") + monkeypatch.setenv("QDRANT_URL", "http://qdrant") + monkeypatch.setenv("QDRANT_COLLECTION", "rag_test") + monkeypatch.setenv("WEBHOOK_SECRET", "abc") + + from app.config import get_settings + get_settings.cache_clear() + import app.ingest.pipeline as pipe + pipe._qdrant_client.cache_clear() + + monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock()) + from app.main import app + return app + + +def test_bulk_import_lists_and_dispatches(monkeypatch): + app = _make_app(monkeypatch) + + listed = [ + "Documents/THB/Studium/2.Semester/Databases/a.pdf", + "Documents/THB/Studium/2.Semester/Databases/b.docx", + "Documents/THB/Studium/2.Semester/Databases/.rag-meta.json", # ignored + ] + monkeypatch.setattr("app.bulk.list_files_recursive", AsyncMock(return_value=listed)) + + process_mock = AsyncMock() + monkeypatch.setattr("app.bulk.process_file", process_mock) + + with TestClient(app) as client: + r = client.post( + "/bulk-import", + json={"path": "Documents/THB/Studium/2.Semester/Databases"}, + headers={"X-Webhook-Secret": "abc"}, + ) + + assert r.status_code == 202 + body = r.json() + assert body["dispatched"] == 2 # only .pdf and .docx, not the json sidecar + + +def test_bulk_import_rejects_wrong_secret(monkeypatch): + app = _make_app(monkeypatch) + with TestClient(app) as client: + r = client.post("/bulk-import", json={"path": "x"}, headers={"X-Webhook-Secret": "nope"}) + assert r.status_code == 401