app/mcp_server.py: FastMCP (mcp SDK), streamable-http auf /mcp, statischer Bearer-Token (constant-time ASGI-Middleware), Fail-Fast ohne RAG_MCP_TOKEN. Tools rag_search (mit semester/fach/typ-Filter) + get_file_chunks. Läuft aus demselben Image wie der Ingestor und reused den Embed-Pfad → Vektoren sind garantiert kompatibel zum Ingest (der offizielle qdrant-MCP-Server kann nur fastembed → Dimension-/Schema-Mismatch). app/qdrant_store.py: search_chunks (query_points + optionaler Payload-Filter) und get_chunks_by_path (scroll, nach chunk_index sortiert). app/bulk.py: Amplification-Guard — /bulk-import lehnt mit 409 ab solange ein vorheriger Bulk noch BackgroundTasks abarbeitet. docker-compose.coolify.yml: rag-mcp-Service (nicht public, externes metamcp-net statt Stack-Coupling) + Traefik-Rate-Limit-Middleware am ingestor. tests/conftest.py: Settings-env_file in Tests neutralisieren (Dev-.env darf die Suite nicht kontaminieren). 68 passed, ruff clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
145 lines
4.4 KiB
Python
145 lines
4.4 KiB
Python
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http import models as qm
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ChunkPoint:
|
|
vector: list[float]
|
|
payload: dict[str, Any]
|
|
|
|
|
|
def ensure_collection(client: QdrantClient, name: str, vector_size: int) -> None:
|
|
"""Create the collection if missing. Crash if it exists with wrong dim.
|
|
|
|
Note: payload indexes are created only on initial collection creation;
|
|
they are not reconciled on subsequent runs.
|
|
"""
|
|
if not client.collection_exists(name):
|
|
client.create_collection(
|
|
collection_name=name,
|
|
vectors_config=qm.VectorParams(size=vector_size, distance=qm.Distance.COSINE),
|
|
)
|
|
for field in ("file_path", "semester", "fach"):
|
|
client.create_payload_index(
|
|
collection_name=name,
|
|
field_name=field,
|
|
field_schema=qm.PayloadSchemaType.KEYWORD,
|
|
)
|
|
return
|
|
|
|
info = client.get_collection(name)
|
|
existing = info.config.params.vectors.size
|
|
if existing != vector_size:
|
|
raise RuntimeError(
|
|
f"qdrant collection '{name}' dimension mismatch: "
|
|
f"existing={existing}, model={vector_size}. "
|
|
"Drop the collection manually and run a bulk import."
|
|
)
|
|
|
|
|
|
def upsert_chunks(client: QdrantClient, name: str, chunks: list[ChunkPoint]) -> None:
|
|
"""Insert chunks with fresh UUID ids.
|
|
|
|
Caller is responsible for deduplication: call ``delete_by_path`` for the
|
|
file before re-ingesting, otherwise duplicates accumulate.
|
|
"""
|
|
points = [
|
|
qm.PointStruct(id=str(uuid.uuid4()), vector=c.vector, payload=c.payload)
|
|
for c in chunks
|
|
]
|
|
client.upsert(collection_name=name, points=points)
|
|
|
|
|
|
def delete_by_path(client: QdrantClient, name: str, file_path: str) -> None:
|
|
selector = qm.FilterSelector(
|
|
filter=qm.Filter(
|
|
must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))]
|
|
)
|
|
)
|
|
client.delete(collection_name=name, points_selector=selector)
|
|
|
|
|
|
_RESULT_FIELDS = (
|
|
"text",
|
|
"file_path",
|
|
"file_name",
|
|
"semester",
|
|
"fach",
|
|
"typ",
|
|
"page",
|
|
"chunk_index",
|
|
)
|
|
|
|
|
|
def _payload_filter(
|
|
semester: str | None, fach: str | None, typ: str | None
|
|
) -> qm.Filter | None:
|
|
"""Build a Qdrant filter from optional metadata constraints, or None."""
|
|
conditions = [
|
|
qm.FieldCondition(key=key, match=qm.MatchValue(value=value))
|
|
for key, value in (("semester", semester), ("fach", fach), ("typ", typ))
|
|
if value
|
|
]
|
|
return qm.Filter(must=conditions) if conditions else None
|
|
|
|
|
|
def search_chunks(
|
|
client: QdrantClient,
|
|
name: str,
|
|
vector: list[float],
|
|
limit: int,
|
|
semester: str | None = None,
|
|
fach: str | None = None,
|
|
typ: str | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Vector search with optional metadata filtering.
|
|
|
|
Returns one dict per hit: the indexed payload fields plus the similarity
|
|
``score``. Caller must pass a vector embedded with the *same* model used
|
|
at ingest time, otherwise results are meaningless.
|
|
"""
|
|
response = client.query_points(
|
|
collection_name=name,
|
|
query=vector,
|
|
limit=limit,
|
|
query_filter=_payload_filter(semester, fach, typ),
|
|
with_payload=True,
|
|
)
|
|
out: list[dict[str, Any]] = []
|
|
for point in response.points:
|
|
payload = point.payload or {}
|
|
row: dict[str, Any] = {field: payload.get(field) for field in _RESULT_FIELDS}
|
|
row["score"] = point.score
|
|
out.append(row)
|
|
return out
|
|
|
|
|
|
def get_chunks_by_path(
|
|
client: QdrantClient, name: str, file_path: str
|
|
) -> list[dict[str, Any]]:
|
|
"""Return every chunk of one document, ordered by ``chunk_index``."""
|
|
points, _ = client.scroll(
|
|
collection_name=name,
|
|
scroll_filter=qm.Filter(
|
|
must=[qm.FieldCondition(key="file_path", match=qm.MatchValue(value=file_path))]
|
|
),
|
|
limit=10_000,
|
|
with_payload=True,
|
|
with_vectors=False,
|
|
)
|
|
rows = [
|
|
{
|
|
"chunk_index": p.payload.get("chunk_index"),
|
|
"page": p.payload.get("page"),
|
|
"text": p.payload.get("text"),
|
|
}
|
|
for p in points
|
|
if p.payload is not None
|
|
]
|
|
rows.sort(key=lambda r: r["chunk_index"] if r["chunk_index"] is not None else 0)
|
|
return rows
|