104 lines
3.5 KiB
Python
104 lines
3.5 KiB
Python
import logging
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import PurePosixPath
|
|
from urllib.parse import unquote
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, BackgroundTasks, Header, HTTPException, status
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.config import get_settings
|
|
from app.ingest.extractors import SUPPORTED_TYPES
|
|
from app.ingest.pipeline import process_file
|
|
from app.webhook.auth import verify_secret
|
|
from app.webhook.models import EventType
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
|
|
class BulkRequest(BaseModel):
|
|
path: str = Field(min_length=1)
|
|
|
|
|
|
PROPFIND_BODY = """<?xml version="1.0"?>
|
|
<d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>
|
|
"""
|
|
|
|
DAV_NS = {"d": "DAV:"}
|
|
|
|
|
|
async def list_files_recursive(base_url: str, user: str, password: str, path: str) -> list[str]:
|
|
"""PROPFIND with Depth: infinity. Returns relative file paths (no folders)."""
|
|
base = base_url.rstrip("/")
|
|
rel = path.strip("/")
|
|
url = f"{base}/{rel}"
|
|
|
|
async with httpx.AsyncClient(auth=(user, password), timeout=120.0) as client:
|
|
response = await client.request(
|
|
"PROPFIND",
|
|
url,
|
|
headers={"Depth": "infinity", "Content-Type": "application/xml"},
|
|
content=PROPFIND_BODY,
|
|
)
|
|
if response.status_code not in (200, 207):
|
|
raise RuntimeError(f"PROPFIND failed: status={response.status_code}")
|
|
|
|
root = ET.fromstring(response.text)
|
|
base_path_segment = PurePosixPath(httpx.URL(base).path).as_posix() # "/remote.php/dav/files/u"
|
|
out: list[str] = []
|
|
for resp in root.findall("d:response", DAV_NS):
|
|
href = resp.findtext("d:href", default="", namespaces=DAV_NS)
|
|
decoded = unquote(href)
|
|
# Strip the WebDAV base prefix → leaves "Documents/.../file.pdf"
|
|
if decoded.startswith(base_path_segment):
|
|
decoded = decoded[len(base_path_segment):]
|
|
decoded = decoded.lstrip("/")
|
|
if not decoded or decoded.endswith("/"):
|
|
continue
|
|
# Skip directory entries (those have <d:collection/> resourcetype)
|
|
rt = resp.find("d:propstat/d:prop/d:resourcetype/d:collection", DAV_NS)
|
|
if rt is not None:
|
|
continue
|
|
out.append(decoded)
|
|
return out
|
|
|
|
|
|
@router.post("/bulk-import", status_code=status.HTTP_202_ACCEPTED)
|
|
async def bulk_import(
|
|
body: BulkRequest,
|
|
background: BackgroundTasks,
|
|
x_webhook_secret: str | None = Header(default=None),
|
|
):
|
|
settings = get_settings()
|
|
verify_secret(x_webhook_secret, settings.webhook_secret)
|
|
|
|
try:
|
|
files = await list_files_recursive(
|
|
settings.nextcloud_webdav_url,
|
|
settings.nextcloud_user,
|
|
settings.nextcloud_app_password,
|
|
body.path,
|
|
)
|
|
except (RuntimeError, httpx.HTTPError, ET.ParseError) as exc:
|
|
logger.exception(
|
|
"bulk listing failed",
|
|
extra={"event": "bulk_listing_failed", "path": body.path, "error": str(exc)},
|
|
)
|
|
raise HTTPException(status_code=502, detail="webdav listing failed") from exc
|
|
|
|
dispatched = 0
|
|
for f in files:
|
|
ext = PurePosixPath(f).suffix.lstrip(".").lower()
|
|
if ext not in SUPPORTED_TYPES:
|
|
continue
|
|
background.add_task(process_file, f, EventType.CREATED)
|
|
dispatched += 1
|
|
|
|
logger.info(
|
|
"bulk dispatch",
|
|
extra={"event": "bulk_dispatch", "path": body.path, "dispatched": dispatched, "total_listed": len(files)},
|
|
)
|
|
return {"status": "accepted", "dispatched": dispatched}
|