feat: pipeline-orchestrator fuer single-file ingest

This commit is contained in:
2026-05-04 22:32:29 +02:00
parent e68e77a821
commit 02c8f5d338
2 changed files with 236 additions and 0 deletions

124
app/ingest/pipeline.py Normal file
View File

@@ -0,0 +1,124 @@
import logging
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import PurePosixPath
from qdrant_client import QdrantClient
from app.config import get_settings
from app.ingest.chunker import chunk_text
from app.ingest.embedder import embed_texts
from app.ingest.extractors import extract, UnsupportedFileType, SUPPORTED_TYPES
from app.ingest.metadata import parse_path
from app.ingest.webdav import download_file
from app.qdrant_store import upsert_chunks, delete_by_path, ChunkPoint
from app.webhook.models import EventType
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def _qdrant_client() -> QdrantClient:
return QdrantClient(url=get_settings().qdrant_url)
async def process_file(file_path: str, event_type: EventType) -> None:
"""End-to-end pipeline for one file event."""
settings = get_settings()
file_path = file_path.lstrip("/")
metadata = parse_path(file_path, settings.ingest_root)
if metadata is None:
logger.info(
"skip outside ingest root",
extra={"event": "skip", "reason": "outside_root", "file": file_path},
)
return
if event_type == EventType.DELETED:
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
logger.info("deleted", extra={"event": "delete", "file": file_path})
return
extension = PurePosixPath(file_path).suffix.lstrip(".").lower()
if extension not in SUPPORTED_TYPES:
logger.info(
"skip unsupported type",
extra={"event": "skip", "reason": "unsupported_type", "file": file_path, "ext": extension},
)
return
try:
data = await download_file(
settings.nextcloud_webdav_url,
settings.nextcloud_user,
settings.nextcloud_app_password,
file_path,
)
except Exception as exc:
logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)})
return
try:
pages = extract(data, extension, filename=PurePosixPath(file_path).name)
except UnsupportedFileType:
logger.info("unsupported", extra={"event": "skip", "file": file_path})
return
except Exception as exc:
logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)})
return
chunks: list[tuple[str, int, int]] = [] # (text, page, chunk_index)
chunk_index = 0
for page in pages:
for chunk in chunk_text(
page.text,
size_words=settings.chunk_size_words,
overlap_words=settings.chunk_overlap_words,
page=page.page,
):
chunks.append((chunk.text, chunk.page, chunk_index))
chunk_index += 1
if not chunks:
logger.info("no chunks", extra={"event": "skip", "reason": "empty_text", "file": file_path})
# Still delete any prior data for this path
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
return
try:
vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model)
except Exception as exc:
logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)})
return
now_iso = datetime.now(timezone.utc).isoformat()
file_name = PurePosixPath(file_path).name
points = [
ChunkPoint(
vector=vec,
payload={
"file_path": file_path,
"file_name": file_name,
"file_type": extension,
"semester": metadata.semester,
"fach": metadata.fach,
"typ": metadata.typ,
"page": page,
"chunk_index": idx,
"text": text,
"ingested_at": now_iso,
},
)
for vec, (text, page, idx) in zip(vectors, chunks)
]
qdrant = _qdrant_client()
delete_by_path(qdrant, settings.qdrant_collection, file_path)
upsert_chunks(qdrant, settings.qdrant_collection, points)
logger.info(
"ingested",
extra={"event": "ingest_done", "file": file_path, "chunks": len(points)},
)

112
tests/test_pipeline.py Normal file
View File

@@ -0,0 +1,112 @@
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.webhook.models import EventType
@pytest.fixture(autouse=True)
def _populate_env(monkeypatch):
monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc")
monkeypatch.setenv("NEXTCLOUD_USER", "u")
monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p")
monkeypatch.setenv("OLLAMA_URL", "http://ollama")
monkeypatch.setenv("OLLAMA_EMBED_MODEL", "m")
monkeypatch.setenv("QDRANT_URL", "http://qdrant")
monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
monkeypatch.setenv("WEBHOOK_SECRET", "abc")
from app.config import get_settings
get_settings.cache_clear()
yield
get_settings.cache_clear()
@pytest.mark.asyncio
async def test_process_deleted_event_calls_delete_only(monkeypatch):
from app.ingest.pipeline import process_file
qdrant = MagicMock()
monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)
download_mock = AsyncMock()
monkeypatch.setattr("app.ingest.pipeline.download_file", download_mock)
await process_file(
file_path="Documents/THB/Studium/2.Semester/Databases/x.pdf",
event_type=EventType.DELETED,
)
download_mock.assert_not_called()
qdrant.delete.assert_called_once()
@pytest.mark.asyncio
async def test_process_outside_root_skips(monkeypatch):
from app.ingest.pipeline import process_file
qdrant = MagicMock()
monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)
download_mock = AsyncMock()
monkeypatch.setattr("app.ingest.pipeline.download_file", download_mock)
await process_file(
file_path="Documents/Other/x.pdf",
event_type=EventType.CREATED,
)
download_mock.assert_not_called()
qdrant.delete.assert_not_called()
qdrant.upsert.assert_not_called()
@pytest.mark.asyncio
async def test_process_unsupported_extension_skips(monkeypatch):
from app.ingest.pipeline import process_file
qdrant = MagicMock()
monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)
monkeypatch.setattr("app.ingest.pipeline.download_file", AsyncMock())
await process_file(
file_path="Documents/THB/Studium/2.Semester/Databases/notes.txt",
event_type=EventType.CREATED,
)
qdrant.upsert.assert_not_called()
@pytest.mark.asyncio
async def test_process_created_full_flow(monkeypatch, sample_pdf_bytes):
from app.ingest.pipeline import process_file
qdrant = MagicMock()
monkeypatch.setattr("app.ingest.pipeline._qdrant_client", lambda: qdrant)
monkeypatch.setattr(
"app.ingest.pipeline.download_file",
AsyncMock(return_value=sample_pdf_bytes),
)
monkeypatch.setattr(
"app.ingest.pipeline.embed_texts",
AsyncMock(return_value=[[0.1] * 4, [0.2] * 4]),
)
await process_file(
file_path="Documents/THB/Studium/2.Semester/Databases/Vorlesungen/x.pdf",
event_type=EventType.CREATED,
)
# delete called first (idempotency), upsert called after
qdrant.delete.assert_called_once()
qdrant.upsert.assert_called_once()
upserted_points = qdrant.upsert.call_args.kwargs["points"]
assert len(upserted_points) >= 1
payload = upserted_points[0].payload
assert payload["semester"] == "2.Semester"
assert payload["fach"] == "Databases"
assert payload["typ"] == "Vorlesungen"
assert payload["file_type"] == "pdf"
assert payload["chunk_index"] == 0
assert "ingested_at" in payload