feat: bulk-import endpoint mit propfind walk
This commit is contained in:
96
app/bulk.py
Normal file
96
app/bulk.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
import logging
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
from pathlib import PurePosixPath
|
||||||
|
from urllib.parse import unquote
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from fastapi import APIRouter, BackgroundTasks, Header, status
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
from app.ingest.extractors import SUPPORTED_TYPES
|
||||||
|
from app.ingest.pipeline import process_file
|
||||||
|
from app.webhook.auth import verify_secret
|
||||||
|
from app.webhook.models import EventType
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
class BulkRequest(BaseModel):
|
||||||
|
path: str
|
||||||
|
|
||||||
|
|
||||||
|
PROPFIND_BODY = """<?xml version="1.0"?>
|
||||||
|
<d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>
|
||||||
|
"""
|
||||||
|
|
||||||
|
DAV_NS = {"d": "DAV:"}
|
||||||
|
|
||||||
|
|
||||||
|
async def list_files_recursive(base_url: str, user: str, password: str, path: str) -> list[str]:
|
||||||
|
"""PROPFIND with Depth: infinity. Returns relative file paths (no folders)."""
|
||||||
|
base = base_url.rstrip("/")
|
||||||
|
rel = path.strip("/")
|
||||||
|
url = f"{base}/{rel}"
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(auth=(user, password), timeout=120.0) as client:
|
||||||
|
response = await client.request(
|
||||||
|
"PROPFIND",
|
||||||
|
url,
|
||||||
|
headers={"Depth": "infinity", "Content-Type": "application/xml"},
|
||||||
|
content=PROPFIND_BODY,
|
||||||
|
)
|
||||||
|
if response.status_code not in (200, 207):
|
||||||
|
raise RuntimeError(f"PROPFIND failed: status={response.status_code}")
|
||||||
|
|
||||||
|
root = ET.fromstring(response.text)
|
||||||
|
base_path_segment = PurePosixPath(httpx.URL(base).path).as_posix() # "/remote.php/dav/files/u"
|
||||||
|
out: list[str] = []
|
||||||
|
for resp in root.findall("d:response", DAV_NS):
|
||||||
|
href = resp.findtext("d:href", default="", namespaces=DAV_NS)
|
||||||
|
decoded = unquote(href)
|
||||||
|
# Strip the WebDAV base prefix → leaves "Documents/.../file.pdf"
|
||||||
|
if decoded.startswith(base_path_segment):
|
||||||
|
decoded = decoded[len(base_path_segment):]
|
||||||
|
decoded = decoded.lstrip("/")
|
||||||
|
if not decoded or decoded.endswith("/"):
|
||||||
|
continue
|
||||||
|
# Skip directory entries (those have <d:collection/> resourcetype)
|
||||||
|
rt = resp.find("d:propstat/d:prop/d:resourcetype/d:collection", DAV_NS)
|
||||||
|
if rt is not None:
|
||||||
|
continue
|
||||||
|
out.append(decoded)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/bulk-import", status_code=status.HTTP_202_ACCEPTED)
|
||||||
|
async def bulk_import(
|
||||||
|
body: BulkRequest,
|
||||||
|
background: BackgroundTasks,
|
||||||
|
x_webhook_secret: str | None = Header(default=None),
|
||||||
|
):
|
||||||
|
settings = get_settings()
|
||||||
|
verify_secret(x_webhook_secret, settings.webhook_secret)
|
||||||
|
|
||||||
|
files = await list_files_recursive(
|
||||||
|
settings.nextcloud_webdav_url,
|
||||||
|
settings.nextcloud_user,
|
||||||
|
settings.nextcloud_app_password,
|
||||||
|
body.path,
|
||||||
|
)
|
||||||
|
|
||||||
|
dispatched = 0
|
||||||
|
for f in files:
|
||||||
|
ext = PurePosixPath(f).suffix.lstrip(".").lower()
|
||||||
|
if ext not in SUPPORTED_TYPES:
|
||||||
|
continue
|
||||||
|
background.add_task(process_file, f, EventType.CREATED)
|
||||||
|
dispatched += 1
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"bulk dispatch",
|
||||||
|
extra={"event": "bulk_dispatch", "path": body.path, "dispatched": dispatched, "total_listed": len(files)},
|
||||||
|
)
|
||||||
|
return {"status": "accepted", "dispatched": dispatched}
|
||||||
@@ -9,6 +9,7 @@ from app.ingest.embedder import embedding_dimension
|
|||||||
from app.logging_setup import setup_logging
|
from app.logging_setup import setup_logging
|
||||||
from app.qdrant_store import ensure_collection
|
from app.qdrant_store import ensure_collection
|
||||||
from app.webhook.handler import router as webhook_router
|
from app.webhook.handler import router as webhook_router
|
||||||
|
from app.bulk import router as bulk_router
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -34,6 +35,7 @@ async def lifespan(app: FastAPI):
|
|||||||
|
|
||||||
app = FastAPI(title="rag-ingestor", lifespan=lifespan)
|
app = FastAPI(title="rag-ingestor", lifespan=lifespan)
|
||||||
app.include_router(webhook_router)
|
app.include_router(webhook_router)
|
||||||
|
app.include_router(bulk_router)
|
||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
|
|||||||
54
tests/test_bulk.py
Normal file
54
tests/test_bulk.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
from unittest.mock import AsyncMock
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
|
||||||
|
def _make_app(monkeypatch):
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc")
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_USER", "u")
|
||||||
|
monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p")
|
||||||
|
monkeypatch.setenv("OLLAMA_URL", "http://ollama")
|
||||||
|
monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m")
|
||||||
|
monkeypatch.setenv("QDRANT_URL", "http://qdrant")
|
||||||
|
monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
|
||||||
|
monkeypatch.setenv("WEBHOOK_SECRET", "abc")
|
||||||
|
|
||||||
|
from app.config import get_settings
|
||||||
|
get_settings.cache_clear()
|
||||||
|
import app.ingest.pipeline as pipe
|
||||||
|
pipe._qdrant_client.cache_clear()
|
||||||
|
|
||||||
|
monkeypatch.setattr("app.main._startup_ensure_collection", AsyncMock())
|
||||||
|
from app.main import app
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
def test_bulk_import_lists_and_dispatches(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
|
||||||
|
listed = [
|
||||||
|
"Documents/THB/Studium/2.Semester/Databases/a.pdf",
|
||||||
|
"Documents/THB/Studium/2.Semester/Databases/b.docx",
|
||||||
|
"Documents/THB/Studium/2.Semester/Databases/.rag-meta.json", # ignored
|
||||||
|
]
|
||||||
|
monkeypatch.setattr("app.bulk.list_files_recursive", AsyncMock(return_value=listed))
|
||||||
|
|
||||||
|
process_mock = AsyncMock()
|
||||||
|
monkeypatch.setattr("app.bulk.process_file", process_mock)
|
||||||
|
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.post(
|
||||||
|
"/bulk-import",
|
||||||
|
json={"path": "Documents/THB/Studium/2.Semester/Databases"},
|
||||||
|
headers={"X-Webhook-Secret": "abc"},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert r.status_code == 202
|
||||||
|
body = r.json()
|
||||||
|
assert body["dispatched"] == 2 # only .pdf and .docx, not the json sidecar
|
||||||
|
|
||||||
|
|
||||||
|
def test_bulk_import_rejects_wrong_secret(monkeypatch):
|
||||||
|
app = _make_app(monkeypatch)
|
||||||
|
with TestClient(app) as client:
|
||||||
|
r = client.post("/bulk-import", json={"path": "x"}, headers={"X-Webhook-Secret": "nope"})
|
||||||
|
assert r.status_code == 401
|
||||||
Reference in New Issue
Block a user