Files
rag-ingestor/tests/test_mcp_server.py
Jean-Luc Makiola 9643011e64
All checks were successful
CI / ci (push) Successful in 49s
Release / release (push) Successful in 1m2s
feat: MCP-Server für RAG-Retrieval + Webhook-Härtung
app/mcp_server.py: FastMCP (mcp SDK), streamable-http auf /mcp, statischer
Bearer-Token (constant-time ASGI-Middleware), Fail-Fast ohne RAG_MCP_TOKEN.
Tools rag_search (mit semester/fach/typ-Filter) + get_file_chunks. Läuft aus
demselben Image wie der Ingestor und reused den Embed-Pfad → Vektoren sind
garantiert kompatibel zum Ingest (der offizielle qdrant-MCP-Server kann nur
fastembed → Dimension-/Schema-Mismatch).

app/qdrant_store.py: search_chunks (query_points + optionaler Payload-Filter)
und get_chunks_by_path (scroll, nach chunk_index sortiert).

app/bulk.py: Amplification-Guard — /bulk-import lehnt mit 409 ab solange ein
vorheriger Bulk noch BackgroundTasks abarbeitet.

docker-compose.coolify.yml: rag-mcp-Service (nicht public, externes
metamcp-net statt Stack-Coupling) + Traefik-Rate-Limit-Middleware am ingestor.

tests/conftest.py: Settings-env_file in Tests neutralisieren (Dev-.env darf
die Suite nicht kontaminieren). 68 passed, ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 22:08:37 +02:00

132 lines
3.7 KiB
Python

from unittest.mock import AsyncMock, MagicMock
import pytest
def _env(monkeypatch, *, token="tok"):
monkeypatch.setenv("NEXTCLOUD_WEBDAV_URL", "http://nc")
monkeypatch.setenv("NEXTCLOUD_USER", "u")
monkeypatch.setenv("NEXTCLOUD_APP_PASSWORD", "p")
monkeypatch.setenv("OLLAMA_URL", "http://ollama")
monkeypatch.setenv("OLLAMA_EMBED_MODEL", "qwen3-embedding:0.6b")
monkeypatch.setenv("QDRANT_URL", "http://qdrant")
monkeypatch.setenv("QDRANT_COLLECTION", "rag_test")
monkeypatch.setenv("WEBHOOK_SECRET", "s")
if token is not None:
monkeypatch.setenv("RAG_MCP_TOKEN", token)
from app.config import get_settings
get_settings.cache_clear()
async def _drain(app, scope):
sent = []
async def receive():
return {"type": "http.request", "body": b"", "more_body": False}
async def send(msg):
sent.append(msg)
await app(scope, receive, send)
return sent
def _http_scope(auth_header: bytes | None):
headers = [(b"authorization", auth_header)] if auth_header is not None else []
return {"type": "http", "headers": headers}
async def test_middleware_rejects_missing_token(monkeypatch):
from app.mcp_server import BearerAuthMiddleware
inner = AsyncMock()
mw = BearerAuthMiddleware(inner, "secret")
sent = await _drain(mw, _http_scope(None))
assert sent[0]["status"] == 401
inner.assert_not_awaited()
async def test_middleware_rejects_wrong_token(monkeypatch):
from app.mcp_server import BearerAuthMiddleware
inner = AsyncMock()
mw = BearerAuthMiddleware(inner, "secret")
sent = await _drain(mw, _http_scope(b"Bearer nope"))
assert sent[0]["status"] == 401
inner.assert_not_awaited()
async def test_middleware_passes_correct_token():
from app.mcp_server import BearerAuthMiddleware
inner = AsyncMock()
mw = BearerAuthMiddleware(inner, "secret")
await _drain(mw, _http_scope(b"Bearer secret"))
inner.assert_awaited_once()
async def test_middleware_passes_through_non_http_scope():
from app.mcp_server import BearerAuthMiddleware
inner = AsyncMock()
mw = BearerAuthMiddleware(inner, "secret")
await _drain(mw, {"type": "lifespan"})
inner.assert_awaited_once()
def test_build_app_exits_without_token(monkeypatch):
_env(monkeypatch, token=None)
import app.mcp_server as m
with pytest.raises(SystemExit):
m.build_app()
async def test_rag_search_embeds_query_and_forwards_filters(monkeypatch):
_env(monkeypatch)
import app.mcp_server as m
embed = AsyncMock(return_value=[[0.5] * 4])
search = MagicMock(return_value=[{"text": "hit", "score": 0.9}])
monkeypatch.setattr(m, "embed_texts", embed)
monkeypatch.setattr(m, "search_chunks", search)
monkeypatch.setattr(m, "_qdrant", lambda: "CLIENT")
out = await m.rag_search("was ist BPMN?", limit=3, semester="2.Semester")
embed.assert_awaited_once_with(["was ist BPMN?"], model="qwen3-embedding:0.6b")
args, kwargs = search.call_args
assert args[0] == "CLIENT"
assert args[1] == "rag_test"
assert args[2] == [0.5] * 4
assert kwargs == {
"limit": 3,
"semester": "2.Semester",
"fach": None,
"typ": None,
}
assert out == [{"text": "hit", "score": 0.9}]
async def test_get_file_chunks_delegates(monkeypatch):
_env(monkeypatch)
import app.mcp_server as m
getter = MagicMock(return_value=[{"chunk_index": 0, "text": "x"}])
monkeypatch.setattr(m, "get_chunks_by_path", getter)
monkeypatch.setattr(m, "_qdrant", lambda: "CLIENT")
out = await m.get_file_chunks("Documents/x.pdf")
getter.assert_called_once_with("CLIENT", "rag_test", "Documents/x.pdf")
assert out == [{"chunk_index": 0, "text": "x"}]