Files
rag-ingestor/app/mcp_server.py
Jean-Luc Makiola 9643011e64
All checks were successful
CI / ci (push) Successful in 49s
Release / release (push) Successful in 1m2s
feat: MCP-Server für RAG-Retrieval + Webhook-Härtung
app/mcp_server.py: FastMCP (mcp SDK), streamable-http auf /mcp, statischer
Bearer-Token (constant-time ASGI-Middleware), Fail-Fast ohne RAG_MCP_TOKEN.
Tools rag_search (mit semester/fach/typ-Filter) + get_file_chunks. Läuft aus
demselben Image wie der Ingestor und reused den Embed-Pfad → Vektoren sind
garantiert kompatibel zum Ingest (der offizielle qdrant-MCP-Server kann nur
fastembed → Dimension-/Schema-Mismatch).

app/qdrant_store.py: search_chunks (query_points + optionaler Payload-Filter)
und get_chunks_by_path (scroll, nach chunk_index sortiert).

app/bulk.py: Amplification-Guard — /bulk-import lehnt mit 409 ab solange ein
vorheriger Bulk noch BackgroundTasks abarbeitet.

docker-compose.coolify.yml: rag-mcp-Service (nicht public, externes
metamcp-net statt Stack-Coupling) + Traefik-Rate-Limit-Middleware am ingestor.

tests/conftest.py: Settings-env_file in Tests neutralisieren (Dev-.env darf
die Suite nicht kontaminieren). 68 passed, ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 22:08:37 +02:00

148 lines
4.5 KiB
Python

"""MCP server exposing the THB-Studium RAG corpus.
Runs from the *same* image as the ingestor and reuses its embedding path
(`app.ingest.embedder`), so query vectors are produced by the exact model
used at ingest time — the only way Qdrant search returns meaningful hits.
Transport: streamable-http on ``/mcp``. A static bearer token gates every
request; the token is the second control layer behind network isolation
(the service is only reachable by MetaMCP over a dedicated bridge).
"""
import hmac
import logging
import sys
from functools import lru_cache
import uvicorn
from mcp.server.fastmcp import FastMCP
from qdrant_client import QdrantClient
from starlette.types import Receive, Scope, Send
from app.config import get_settings
from app.ingest.embedder import embed_texts
from app.logging_setup import setup_logging
from app.qdrant_store import get_chunks_by_path, search_chunks
logger = logging.getLogger(__name__)
mcp = FastMCP("rag-thb")
@lru_cache(maxsize=1)
def _qdrant() -> QdrantClient:
return QdrantClient(url=get_settings().qdrant_url)
@mcp.tool()
async def rag_search(
query: str,
limit: int = 5,
semester: str | None = None,
fach: str | None = None,
typ: str | None = None,
) -> list[dict]:
"""Semantische Suche im THB-Studium-Wissen (Vorlesungen, Übungen, Notizen).
Args:
query: Natürlichsprachige Suchanfrage.
limit: Maximale Trefferzahl (Default 5).
semester: Optionaler Filter, z.B. "2.Semester".
fach: Optionaler Filter, z.B. "Databases".
typ: Optionaler Filter, z.B. "Vorlesungen" oder "Uebungen".
Returns:
Treffer mit text und Quell-Metadaten (file_path, semester, fach,
typ, page, chunk_index) plus Similarity-score, absteigend sortiert.
"""
settings = get_settings()
vectors = await embed_texts([query], model=settings.ollama_embed_model)
return search_chunks(
_qdrant(),
settings.qdrant_collection,
vectors[0],
limit=limit,
semester=semester,
fach=fach,
typ=typ,
)
@mcp.tool()
async def get_file_chunks(file_path: str) -> list[dict]:
"""Alle Chunks eines Dokuments in Reihenfolge laden.
Nützlich, um nach einem rag_search-Treffer das vollständige Dokument
zu rekonstruieren.
Args:
file_path: Exakter Nextcloud-Pfad wie in rag_search-Treffern, z.B.
"Documents/THB/2.Semester/Databases/Uebungen/01/Loesung.pdf".
Returns:
Chunks mit chunk_index, page und text, nach chunk_index sortiert.
"""
settings = get_settings()
return get_chunks_by_path(_qdrant(), settings.qdrant_collection, file_path)
class BearerAuthMiddleware:
"""Pure-ASGI gate: constant-time check of ``Authorization: Bearer <token>``.
Non-HTTP scopes (lifespan, websocket) pass straight through so the
StreamableHTTP session manager's lifespan still runs.
"""
def __init__(self, app, token: str) -> None:
self._app = app
self._expected = f"Bearer {token}"
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self._app(scope, receive, send)
return
headers = dict(scope.get("headers") or [])
provided = headers.get(b"authorization", b"").decode()
if not hmac.compare_digest(provided, self._expected):
await send(
{
"type": "http.response.start",
"status": 401,
"headers": [(b"content-type", b"text/plain")],
}
)
await send({"type": "http.response.body", "body": b"unauthorized"})
return
await self._app(scope, receive, send)
def build_app():
"""Token-gated ASGI app, or exit if RAG_MCP_TOKEN is unset."""
settings = get_settings()
if not settings.rag_mcp_token:
logger.error(
"refusing to start: RAG_MCP_TOKEN is empty",
extra={"event": "mcp_startup_abort"},
)
sys.exit(1)
mcp.settings.host = "0.0.0.0"
mcp.settings.port = settings.rag_mcp_port
return BearerAuthMiddleware(mcp.streamable_http_app(), settings.rag_mcp_token)
def main() -> None:
settings = get_settings()
setup_logging(settings.log_level)
app = build_app()
logger.info(
"mcp server starting",
extra={"event": "mcp_startup", "port": settings.rag_mcp_port},
)
uvicorn.run(app, host="0.0.0.0", port=settings.rag_mcp_port)
if __name__ == "__main__":
main()