Files
rag-ingestor/app/ingest/webdav.py

22 lines
636 B
Python

import httpx
class WebDAVError(Exception):
pass
async def download_file(base_url: str, user: str, password: str, file_path: str, *, timeout: float = 60.0) -> bytes:
"""Fetch a file from Nextcloud WebDAV. Returns the raw bytes."""
base = base_url.rstrip("/")
rel = file_path.lstrip("/")
url = f"{base}/{rel}"
async with httpx.AsyncClient(auth=(user, password), timeout=timeout) as client:
response = await client.get(url)
if response.status_code != 200:
raise WebDAVError(
f"WebDAV GET {file_path} failed: status={response.status_code}"
)
return response.content