import uuid from dataclasses import dataclass from typing import Any from qdrant_client import QdrantClient from qdrant_client.http import models as qm @dataclass(frozen=True) class ChunkPoint: vector: list[float] payload: dict[str, Any] def ensure_collection(client: QdrantClient, name: str, vector_size: int) -> None: """Create the collection if missing. Crash if it exists with wrong dim. Note: payload indexes are created only on initial collection creation; they are not reconciled on subsequent runs. """ if not client.collection_exists(name): client.create_collection( collection_name=name, vectors_config=qm.VectorParams(size=vector_size, distance=qm.Distance.COSINE), ) for field in ("file_path", "semester", "fach"): client.create_payload_index( collection_name=name, field_name=field, field_schema=qm.PayloadSchemaType.KEYWORD, ) return info = client.get_collection(name) existing = info.config.params.vectors.size if existing != vector_size: raise RuntimeError( f"qdrant collection '{name}' dimension mismatch: " f"existing={existing}, model={vector_size}. " "Drop the collection manually and run a bulk import." ) def upsert_chunks(client: QdrantClient, name: str, chunks: list[ChunkPoint]) -> None: """Insert chunks with fresh UUID ids. Caller is responsible for deduplication: call ``delete_by_path`` for the file before re-ingesting, otherwise duplicates accumulate. """ points = [ qm.PointStruct(id=str(uuid.uuid4()), vector=c.vector, payload=c.payload) for c in chunks ] client.upsert(collection_name=name, points=points) def delete_by_path(client: QdrantClient, name: str, file_path: str) -> None: selector = qm.FilterSelector( filter=qm.Filter( must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))] ) ) client.delete(collection_name=name, points_selector=selector) _RESULT_FIELDS = ( "text", "file_path", "file_name", "semester", "fach", "typ", "page", "chunk_index", ) def _payload_filter( semester: str | None, fach: str | None, typ: str | None ) -> qm.Filter | None: """Build a Qdrant filter from optional metadata constraints, or None.""" conditions = [ qm.FieldCondition(key=key, match=qm.MatchValue(value=value)) for key, value in (("semester", semester), ("fach", fach), ("typ", typ)) if value ] return qm.Filter(must=conditions) if conditions else None def search_chunks( client: QdrantClient, name: str, vector: list[float], limit: int, semester: str | None = None, fach: str | None = None, typ: str | None = None, ) -> list[dict[str, Any]]: """Vector search with optional metadata filtering. Returns one dict per hit: the indexed payload fields plus the similarity ``score``. Caller must pass a vector embedded with the *same* model used at ingest time, otherwise results are meaningless. """ response = client.query_points( collection_name=name, query=vector, limit=limit, query_filter=_payload_filter(semester, fach, typ), with_payload=True, ) out: list[dict[str, Any]] = [] for point in response.points: payload = point.payload or {} row: dict[str, Any] = {field: payload.get(field) for field in _RESULT_FIELDS} row["score"] = point.score out.append(row) return out def get_chunks_by_path( client: QdrantClient, name: str, file_path: str ) -> list[dict[str, Any]]: """Return every chunk of one document, ordered by ``chunk_index``.""" points, _ = client.scroll( collection_name=name, scroll_filter=qm.Filter( must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))] ), limit=10_000, with_payload=True, with_vectors=False, ) rows = [ { "chunk_index": p.payload.get("chunk_index"), "page": p.payload.get("page"), "text": p.payload.get("text"), } for p in points if p.payload is not None ] rows.sort(key=lambda r: r["chunk_index"] if r["chunk_index"] is not None else 0) return rows