Files
rag-ingestor/app/qdrant_store.py
Jean-Luc Makiola 9643011e64
All checks were successful
CI / ci (push) Successful in 49s
Release / release (push) Successful in 1m2s
feat: MCP-Server für RAG-Retrieval + Webhook-Härtung
app/mcp_server.py: FastMCP (mcp SDK), streamable-http auf /mcp, statischer
Bearer-Token (constant-time ASGI-Middleware), Fail-Fast ohne RAG_MCP_TOKEN.
Tools rag_search (mit semester/fach/typ-Filter) + get_file_chunks. Läuft aus
demselben Image wie der Ingestor und reused den Embed-Pfad → Vektoren sind
garantiert kompatibel zum Ingest (der offizielle qdrant-MCP-Server kann nur
fastembed → Dimension-/Schema-Mismatch).

app/qdrant_store.py: search_chunks (query_points + optionaler Payload-Filter)
und get_chunks_by_path (scroll, nach chunk_index sortiert).

app/bulk.py: Amplification-Guard — /bulk-import lehnt mit 409 ab solange ein
vorheriger Bulk noch BackgroundTasks abarbeitet.

docker-compose.coolify.yml: rag-mcp-Service (nicht public, externes
metamcp-net statt Stack-Coupling) + Traefik-Rate-Limit-Middleware am ingestor.

tests/conftest.py: Settings-env_file in Tests neutralisieren (Dev-.env darf
die Suite nicht kontaminieren). 68 passed, ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 22:08:37 +02:00

145 lines
4.4 KiB
Python

import uuid
from dataclasses import dataclass
from typing import Any
from qdrant_client import QdrantClient
from qdrant_client.http import models as qm
@dataclass(frozen=True)
class ChunkPoint:
vector: list[float]
payload: dict[str, Any]
def ensure_collection(client: QdrantClient, name: str, vector_size: int) -> None:
"""Create the collection if missing. Crash if it exists with wrong dim.
Note: payload indexes are created only on initial collection creation;
they are not reconciled on subsequent runs.
"""
if not client.collection_exists(name):
client.create_collection(
collection_name=name,
vectors_config=qm.VectorParams(size=vector_size, distance=qm.Distance.COSINE),
)
for field in ("file_path", "semester", "fach"):
client.create_payload_index(
collection_name=name,
field_name=field,
field_schema=qm.PayloadSchemaType.KEYWORD,
)
return
info = client.get_collection(name)
existing = info.config.params.vectors.size
if existing != vector_size:
raise RuntimeError(
f"qdrant collection '{name}' dimension mismatch: "
f"existing={existing}, model={vector_size}. "
"Drop the collection manually and run a bulk import."
)
def upsert_chunks(client: QdrantClient, name: str, chunks: list[ChunkPoint]) -> None:
"""Insert chunks with fresh UUID ids.
Caller is responsible for deduplication: call ``delete_by_path`` for the
file before re-ingesting, otherwise duplicates accumulate.
"""
points = [
qm.PointStruct(id=str(uuid.uuid4()), vector=c.vector, payload=c.payload)
for c in chunks
]
client.upsert(collection_name=name, points=points)
def delete_by_path(client: QdrantClient, name: str, file_path: str) -> None:
selector = qm.FilterSelector(
filter=qm.Filter(
must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))]
)
)
client.delete(collection_name=name, points_selector=selector)
_RESULT_FIELDS = (
"text",
"file_path",
"file_name",
"semester",
"fach",
"typ",
"page",
"chunk_index",
)
def _payload_filter(
semester: str | None, fach: str | None, typ: str | None
) -> qm.Filter | None:
"""Build a Qdrant filter from optional metadata constraints, or None."""
conditions = [
qm.FieldCondition(key=key, match=qm.MatchValue(value=value))
for key, value in (("semester", semester), ("fach", fach), ("typ", typ))
if value
]
return qm.Filter(must=conditions) if conditions else None
def search_chunks(
client: QdrantClient,
name: str,
vector: list[float],
limit: int,
semester: str | None = None,
fach: str | None = None,
typ: str | None = None,
) -> list[dict[str, Any]]:
"""Vector search with optional metadata filtering.
Returns one dict per hit: the indexed payload fields plus the similarity
``score``. Caller must pass a vector embedded with the *same* model used
at ingest time, otherwise results are meaningless.
"""
response = client.query_points(
collection_name=name,
query=vector,
limit=limit,
query_filter=_payload_filter(semester, fach, typ),
with_payload=True,
)
out: list[dict[str, Any]] = []
for point in response.points:
payload = point.payload or {}
row: dict[str, Any] = {field: payload.get(field) for field in _RESULT_FIELDS}
row["score"] = point.score
out.append(row)
return out
def get_chunks_by_path(
client: QdrantClient, name: str, file_path: str
) -> list[dict[str, Any]]:
"""Return every chunk of one document, ordered by ``chunk_index``."""
points, _ = client.scroll(
collection_name=name,
scroll_filter=qm.Filter(
must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))]
),
limit=10_000,
with_payload=True,
with_vectors=False,
)
rows = [
{
"chunk_index": p.payload.get("chunk_index"),
"page": p.payload.get("page"),
"text": p.payload.get("text"),
}
for p in points
if p.payload is not None
]
rows.sort(key=lambda r: r["chunk_index"] if r["chunk_index"] is not None else 0)
return rows