Files
rag-ingestor/app/ingest/pipeline.py
Jean-Luc Makiola ca9ff55587 feat: duration_ms-logging, bulk-semaphore und erweitertes README
- Pipeline-Stages (download/extract/embed/qdrant) loggen jetzt duration_ms
- bulk-import dispatcht mit Semaphore(4) statt unbounded → Backpressure
- README dokumentiert Webhook-Payload-Schema mit curl-Beispiel
- README enthaelt Recovery-Runbook (dim-mismatch, crash-recovery, single-file reindex)
2026-05-04 22:54:58 +02:00

159 lines
5.4 KiB
Python

import logging
import time
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import PurePosixPath
from qdrant_client import QdrantClient
from app.config import get_settings
from app.ingest.chunker import chunk_text
from app.ingest.embedder import embed_texts
from app.ingest.extractors import extract, UnsupportedFileType, SUPPORTED_TYPES
from app.ingest.metadata import parse_path
from app.ingest.webdav import download_file
from app.qdrant_store import upsert_chunks, delete_by_path, ChunkPoint
from app.webhook.models import EventType
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def _qdrant_client() -> QdrantClient:
return QdrantClient(url=get_settings().qdrant_url)
async def process_file(file_path: str, event_type: EventType) -> None:
"""End-to-end pipeline for one file event."""
settings = get_settings()
file_path = file_path.lstrip("/")
metadata = parse_path(file_path, settings.ingest_root)
if metadata is None:
logger.info(
"skip outside ingest root",
extra={"event": "skip", "reason": "outside_root", "file": file_path},
)
return
if event_type == EventType.DELETED:
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
logger.info("deleted", extra={"event": "delete", "file": file_path})
return
extension = PurePosixPath(file_path).suffix.lstrip(".").lower()
if extension not in SUPPORTED_TYPES:
logger.info(
"skip unsupported type",
extra={"event": "skip", "reason": "unsupported_type", "file": file_path, "ext": extension},
)
return
t0 = time.perf_counter()
try:
data = await download_file(
settings.nextcloud_webdav_url,
settings.nextcloud_user,
settings.nextcloud_app_password,
file_path,
)
except Exception as exc:
logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)})
return
download_ms = int((time.perf_counter() - t0) * 1000)
logger.info(
"download ok",
extra={"event": "download", "status": "ok", "file": file_path, "duration_ms": download_ms, "bytes": len(data)},
)
t0 = time.perf_counter()
try:
pages = extract(data, extension, filename=PurePosixPath(file_path).name)
except UnsupportedFileType:
logger.info("unsupported", extra={"event": "skip", "file": file_path})
return
except Exception as exc:
logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)})
return
extract_ms = int((time.perf_counter() - t0) * 1000)
logger.info(
"extract ok",
extra={"event": "extract", "status": "ok", "file": file_path, "duration_ms": extract_ms, "pages": len(pages)},
)
chunks: list[tuple[str, int, int]] = [] # (text, page, chunk_index)
chunk_index = 0
for page in pages:
for chunk in chunk_text(
page.text,
size_words=settings.chunk_size_words,
overlap_words=settings.chunk_overlap_words,
page=page.page,
):
chunks.append((chunk.text, chunk.page, chunk_index))
chunk_index += 1
if not chunks:
logger.info("no chunks", extra={"event": "skip", "reason": "empty_text", "file": file_path})
# Still delete any prior data for this path
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
return
t0 = time.perf_counter()
try:
vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model)
except Exception as exc:
logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)})
return
embed_ms = int((time.perf_counter() - t0) * 1000)
logger.info(
"embed ok",
extra={"event": "embed", "status": "ok", "file": file_path, "duration_ms": embed_ms, "chunks": len(vectors)},
)
if len(vectors) != len(chunks):
logger.error(
"vector/chunk count mismatch",
extra={"event": "embed_failed", "file": file_path, "vectors": len(vectors), "chunks": len(chunks)},
)
return
now_iso = datetime.now(timezone.utc).isoformat()
file_name = PurePosixPath(file_path).name
points = [
ChunkPoint(
vector=vec,
payload={
"file_path": file_path,
"file_name": file_name,
"file_type": extension,
"semester": metadata.semester,
"fach": metadata.fach,
"typ": metadata.typ,
"page": page_num,
"chunk_index": idx,
"text": text,
"ingested_at": now_iso,
},
)
for vec, (text, page_num, idx) in zip(vectors, chunks)
]
t0 = time.perf_counter()
qdrant = _qdrant_client()
delete_by_path(qdrant, settings.qdrant_collection, file_path)
upsert_chunks(qdrant, settings.qdrant_collection, points)
qdrant_ms = int((time.perf_counter() - t0) * 1000)
logger.info(
"ingested",
extra={
"event": "ingest_done",
"status": "ok",
"file": file_path,
"chunks": len(points),
"duration_ms": qdrant_ms,
},
)