132 lines
4.4 KiB
Python
132 lines
4.4 KiB
Python
import logging
|
|
from datetime import datetime, timezone
|
|
from functools import lru_cache
|
|
from pathlib import PurePosixPath
|
|
|
|
from qdrant_client import QdrantClient
|
|
|
|
from app.config import get_settings
|
|
from app.ingest.chunker import chunk_text
|
|
from app.ingest.embedder import embed_texts
|
|
from app.ingest.extractors import extract, UnsupportedFileType, SUPPORTED_TYPES
|
|
from app.ingest.metadata import parse_path
|
|
from app.ingest.webdav import download_file
|
|
from app.qdrant_store import upsert_chunks, delete_by_path, ChunkPoint
|
|
from app.webhook.models import EventType
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _qdrant_client() -> QdrantClient:
|
|
return QdrantClient(url=get_settings().qdrant_url)
|
|
|
|
|
|
async def process_file(file_path: str, event_type: EventType) -> None:
|
|
"""End-to-end pipeline for one file event."""
|
|
settings = get_settings()
|
|
file_path = file_path.lstrip("/")
|
|
|
|
metadata = parse_path(file_path, settings.ingest_root)
|
|
if metadata is None:
|
|
logger.info(
|
|
"skip outside ingest root",
|
|
extra={"event": "skip", "reason": "outside_root", "file": file_path},
|
|
)
|
|
return
|
|
|
|
if event_type == EventType.DELETED:
|
|
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
|
|
logger.info("deleted", extra={"event": "delete", "file": file_path})
|
|
return
|
|
|
|
extension = PurePosixPath(file_path).suffix.lstrip(".").lower()
|
|
if extension not in SUPPORTED_TYPES:
|
|
logger.info(
|
|
"skip unsupported type",
|
|
extra={"event": "skip", "reason": "unsupported_type", "file": file_path, "ext": extension},
|
|
)
|
|
return
|
|
|
|
try:
|
|
data = await download_file(
|
|
settings.nextcloud_webdav_url,
|
|
settings.nextcloud_user,
|
|
settings.nextcloud_app_password,
|
|
file_path,
|
|
)
|
|
except Exception as exc:
|
|
logger.exception("download failed", extra={"event": "download_failed", "file": file_path, "error": str(exc)})
|
|
return
|
|
|
|
try:
|
|
pages = extract(data, extension, filename=PurePosixPath(file_path).name)
|
|
except UnsupportedFileType:
|
|
logger.info("unsupported", extra={"event": "skip", "file": file_path})
|
|
return
|
|
except Exception as exc:
|
|
logger.exception("extract failed", extra={"event": "extract_failed", "file": file_path, "error": str(exc)})
|
|
return
|
|
|
|
chunks: list[tuple[str, int, int]] = [] # (text, page, chunk_index)
|
|
chunk_index = 0
|
|
for page in pages:
|
|
for chunk in chunk_text(
|
|
page.text,
|
|
size_words=settings.chunk_size_words,
|
|
overlap_words=settings.chunk_overlap_words,
|
|
page=page.page,
|
|
):
|
|
chunks.append((chunk.text, chunk.page, chunk_index))
|
|
chunk_index += 1
|
|
|
|
if not chunks:
|
|
logger.info("no chunks", extra={"event": "skip", "reason": "empty_text", "file": file_path})
|
|
# Still delete any prior data for this path
|
|
delete_by_path(_qdrant_client(), settings.qdrant_collection, file_path)
|
|
return
|
|
|
|
try:
|
|
vectors = await embed_texts([c[0] for c in chunks], model=settings.ollama_embed_model)
|
|
except Exception as exc:
|
|
logger.exception("embed failed", extra={"event": "embed_failed", "file": file_path, "error": str(exc)})
|
|
return
|
|
|
|
if len(vectors) != len(chunks):
|
|
logger.error(
|
|
"vector/chunk count mismatch",
|
|
extra={"event": "embed_failed", "file": file_path, "vectors": len(vectors), "chunks": len(chunks)},
|
|
)
|
|
return
|
|
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
file_name = PurePosixPath(file_path).name
|
|
points = [
|
|
ChunkPoint(
|
|
vector=vec,
|
|
payload={
|
|
"file_path": file_path,
|
|
"file_name": file_name,
|
|
"file_type": extension,
|
|
"semester": metadata.semester,
|
|
"fach": metadata.fach,
|
|
"typ": metadata.typ,
|
|
"page": page_num,
|
|
"chunk_index": idx,
|
|
"text": text,
|
|
"ingested_at": now_iso,
|
|
},
|
|
)
|
|
for vec, (text, page_num, idx) in zip(vectors, chunks)
|
|
]
|
|
|
|
qdrant = _qdrant_client()
|
|
delete_by_path(qdrant, settings.qdrant_collection, file_path)
|
|
upsert_chunks(qdrant, settings.qdrant_collection, points)
|
|
|
|
logger.info(
|
|
"ingested",
|
|
extra={"event": "ingest_done", "file": file_path, "chunks": len(points)},
|
|
)
|