diff --git a/src/mcp/server/connection.py b/src/mcp/server/connection.py index 4d9496fef..73e775a91 100644 --- a/src/mcp/server/connection.py +++ b/src/mcp/server/connection.py @@ -100,6 +100,30 @@ async def notify(self, method: str, params: Mapping[str, Any] | None, opts: Call _NO_CHANNEL = _NoChannelOutbound() +class NotifyOnlyOutbound: + """Connection-scoped `Outbound` that forwards notifications and refuses requests. + + Installed by `serve_dual_era_loop` for modern (2026-07-28+) connections + over duplex stream transports: the pipe is real, so server notifications + ride it, but the modern protocol forbids server-initiated JSON-RPC + requests, so `send_raw_request` refuses by construction. + """ + + def __init__(self, outbound: Outbound) -> None: + self._outbound = outbound + + async def send_raw_request( + self, + method: str, + params: Mapping[str, Any] | None, + opts: CallOptions | None = None, + ) -> dict[str, Any]: + raise NoBackChannelError(method) + + async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None: + await self._outbound.notify(method, params, opts) + + class Connection: """Per-client connection state and standalone-stream `Outbound`. @@ -167,7 +191,8 @@ def from_envelope( both supplied) are recorded as `client_params` so capability checks work. `outbound` defaults to the no-channel sentinel for the single-exchange HTTP path; duplex modern transports (e.g. stdio) pass - the dispatcher so server-initiated messages have a back-channel. + a notify-only wrapper around the dispatcher so server notifications + ride the pipe while server-initiated requests stay refused. """ client_params = None if client_info is not None and client_capabilities is not None: diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 66c497199..81eaa2b86 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -62,7 +62,7 @@ async def main(): from mcp.server.caching import CacheableMethod, CacheHint, validate_cache_hints from mcp.server.context import HandlerResult, ServerMiddleware, ServerRequestContext from mcp.server.models import InitializationOptions -from mcp.server.runner import serve_loop +from mcp.server.runner import serve_dual_era_loop from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings @@ -689,12 +689,14 @@ async def run( ) -> None: """Serve a single connection over the given streams until the read side closes. - Thin wrapper over `serve_loop`: enters the server lifespan, - then drives the loop. Transports with their own lifespan owner - (the streamable-HTTP manager) call `serve_loop` directly instead. + Thin wrapper over `serve_dual_era_loop`: enters the server lifespan, + then drives the loop, serving the legacy handshake era and the modern + per-request-envelope era (the first era-distinctive message locks the + connection). Transports with their own lifespan owner (the + streamable-HTTP manager) call `serve_loop` directly instead. """ async with self.lifespan(self) as lifespan_context: - await serve_loop( + await serve_dual_era_loop( self, read_stream, write_stream, diff --git a/src/mcp/server/runner.py b/src/mcp/server/runner.py index 6aa9cd6d5..d5783a598 100644 --- a/src/mcp/server/runner.py +++ b/src/mcp/server/runner.py @@ -5,8 +5,8 @@ pure kernel: it holds a pre-populated `Connection` and reads `connection.protocol_version` / `connection.outbound` as facts. Driving a dispatcher loop and tearing down the connection live in the free-function -drivers (`serve_connection`, `serve_loop`, `serve_one`); the entry constructs -the `Connection`, the driver tears it down. +drivers (`serve_connection`, `serve_loop`, `serve_dual_era_loop`, `serve_one`); +the entry constructs the `Connection`, the driver tears it down. `ServerRunner` holds a `Server` directly - `Server` is the registry. """ @@ -17,7 +17,7 @@ from collections.abc import Awaitable, Mapping from dataclasses import KW_ONLY, dataclass from functools import cached_property, partial -from typing import TYPE_CHECKING, Any, Generic, cast +from typing import TYPE_CHECKING, Any, Generic, Literal, cast import anyio import anyio.abc @@ -26,31 +26,41 @@ CLIENT_INFO_META_KEY, INTERNAL_ERROR, INVALID_PARAMS, + INVALID_REQUEST, METHOD_NOT_FOUND, PROTOCOL_VERSION_META_KEY, + UNSUPPORTED_PROTOCOL_VERSION, CacheableResult, ErrorData, Implementation, InitializeRequestParams, InitializeResult, + RequestId, RequestParams, RequestParamsMeta, + UnsupportedProtocolVersionErrorData, ) from mcp_types import methods as _methods -from mcp_types.version import HANDSHAKE_PROTOCOL_VERSIONS, LATEST_HANDSHAKE_VERSION, LATEST_MODERN_VERSION +from mcp_types.version import ( + HANDSHAKE_PROTOCOL_VERSIONS, + LATEST_HANDSHAKE_VERSION, + LATEST_MODERN_VERSION, + MODERN_PROTOCOL_VERSIONS, +) from pydantic import BaseModel, ValidationError from typing_extensions import TypeVar from mcp.server.caching import apply_cache_hint -from mcp.server.connection import Connection +from mcp.server.connection import Connection, NotifyOnlyOutbound from mcp.server.context import CallNext, HandlerResult, ServerMiddleware, ServerRequestContext from mcp.server.models import InitializationOptions from mcp.server.session import ServerSession from mcp.shared._stream_protocols import ReadStream, WriteStream -from mcp.shared.dispatcher import DispatchContext, Dispatcher, OnNotify, OnRequest -from mcp.shared.exceptions import MCPError +from mcp.shared.dispatcher import CallOptions, DispatchContext, Dispatcher, OnNotify, OnRequest +from mcp.shared.exceptions import MCPError, NoBackChannelError +from mcp.shared.inbound import InboundLadderRejection, classify_inbound_request from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher -from mcp.shared.message import ServerMessageMetadata, SessionMessage +from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage from mcp.shared.transport_context import TransportContext if TYPE_CHECKING: @@ -63,6 +73,7 @@ "aclose_shielded", "modern_on_request", "serve_connection", + "serve_dual_era_loop", "serve_loop", "serve_one", ] @@ -427,6 +438,206 @@ async def serve_loop( ) +_MODERN_ENVELOPE_KEYS = (PROTOCOL_VERSION_META_KEY, CLIENT_INFO_META_KEY, CLIENT_CAPABILITIES_META_KEY) + + +def _has_modern_envelope(params: Mapping[str, Any] | None) -> bool: + """Whether `params._meta` carries every reserved modern-envelope key. + + Era evidence is the FULL key triple - bare `_meta` is not (legacy traffic + carries `progressToken` there). + """ + if not params: + return False + meta = params.get("_meta") + return isinstance(meta, Mapping) and all(key in meta for key in _MODERN_ENVELOPE_KEYS) + + +def _initialize_after_modern_data(params: Mapping[str, Any] | None) -> dict[str, Any]: + """Error data for an `initialize` arriving on a modern-locked connection. + + The typed -32022 payload when the client's proposed version is parseable; + otherwise just the supported list (the point is naming what we serve). + """ + requested = (params or {}).get("protocolVersion") + if isinstance(requested, str): + return UnsupportedProtocolVersionErrorData( + supported=list(MODERN_PROTOCOL_VERSIONS), requested=requested + ).model_dump(mode="json") + return {"supported": list(MODERN_PROTOCOL_VERSIONS)} + + +@dataclass +class _NoServerRequestsDispatchContext: + """Delegating `DispatchContext` that refuses server-initiated requests. + + Wraps the loop dispatcher's per-message context for modern-era dispatch: + the modern protocol forbids server-initiated JSON-RPC requests, so + `send_raw_request` refuses while notifications and progress still ride + the duplex pipe. + """ + + _inner: DispatchContext[TransportContext] + + @property + def transport(self) -> TransportContext: + return self._inner.transport + + @property + def can_send_request(self) -> bool: + return False + + @property + def request_id(self) -> RequestId | None: + return self._inner.request_id + + @property + def message_metadata(self) -> MessageMetadata: + return self._inner.message_metadata + + @property + def cancel_requested(self) -> anyio.Event: + return self._inner.cancel_requested + + async def send_raw_request( + self, + method: str, + params: Mapping[str, Any] | None, + opts: CallOptions | None = None, + ) -> dict[str, Any]: + raise NoBackChannelError(method) + + async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None: + await self._inner.notify(method, params, opts) + + async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None: + await self._inner.progress(progress, total, message) + + +async def serve_dual_era_loop( + server: Server[LifespanT], + read_stream: ReadStream[SessionMessage | Exception], + write_stream: WriteStream[SessionMessage], + *, + lifespan_state: LifespanT, + session_id: str | None = None, + init_options: InitializationOptions | None = None, + raise_exceptions: bool = False, +) -> None: + """Drive `server` over a duplex stream pair, serving both protocol eras. + + The stream-pair counterpart of the modern HTTP entry's era router. Era is + a property of the connection, decided by how the client opens it, and + mid-stream switching is undefined - so the first era-distinctive message + locks the connection (matching the typescript-sdk): + + - `initialize` locks legacy: the connection behaves exactly like + `serve_loop` for its lifetime, and modern envelope traffic is rejected + with INVALID_REQUEST. + - A request carrying the modern `_meta` envelope triple - or + `server/discover`, a modern-only method - locks modern: every request is + classified (`classify_inbound_request`) and served single-exchange via + `serve_one` with a born-ready per-request `Connection`, the same + dispatch model as the modern HTTP entry. A later `initialize` is + rejected with UNSUPPORTED_PROTOCOL_VERSION naming the modern versions. + + Modern connections push notifications over the duplex pipe but refuse + server-initiated requests on both channels (the modern protocol forbids + them). A rejected classification (malformed envelope, unsupported version) + never locks the era, so a failed probe leaves the legacy handshake + available - released auto-negotiating clients fall back on any error code + except -32022. + """ + dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher( + read_stream, + write_stream, + raise_handler_exceptions=raise_exceptions, + # `initialize` inline for the same pipelining reason as `serve_loop`; + # `server/discover` inline so the modern era lock commits before the + # next pipelined message is read. + inline_methods=frozenset({"initialize", "server/discover"}), + ) + loop_connection = Connection.for_loop(dispatcher, session_id=session_id) + loop_runner = ServerRunner(server, loop_connection, lifespan_state, init_options=init_options) + standalone_outbound = NotifyOnlyOutbound(dispatcher) + era: Literal["unlocked", "legacy", "modern"] = "unlocked" + modern_version = LATEST_MODERN_VERSION + + async def serve_modern( + dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None + ) -> dict[str, Any]: + nonlocal era, modern_version + route = classify_inbound_request({"method": method, "params": params}) + if isinstance(route, InboundLadderRejection): + raise MCPError(code=route.code, message=route.message, data=route.data) + if era != "modern": + era, modern_version = "modern", route.protocol_version + if method == "subscriptions/listen": + # The registered listen handler assumes the HTTP entry's stream + # semantics; served over a stream pair it would wedge. Reject until + # this transport grows its own listen design. + raise MCPError( + code=METHOD_NOT_FOUND, message="subscriptions/listen is not served over this transport", data=method + ) + connection = Connection.from_envelope( + route.protocol_version, + route.client_info, + route.client_capabilities, + outbound=standalone_outbound, + ) + return await serve_one( + server, + _NoServerRequestsDispatchContext(dctx), + method, + params, + connection=connection, + lifespan_state=lifespan_state, + ) + + async def on_request( + dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None + ) -> dict[str, Any]: + nonlocal era + if era == "legacy": + if method == "server/discover" or _has_modern_envelope(params): + raise MCPError( + code=INVALID_REQUEST, + message="connection is locked to the legacy handshake era; " + "modern envelope requests are not accepted", + ) + return await loop_runner.on_request(dctx, method, params) + if era == "modern" and method == "initialize": + raise MCPError( + code=UNSUPPORTED_PROTOCOL_VERSION, + message="connection already negotiated a modern protocol version", + data=_initialize_after_modern_data(params), + ) + if era == "modern" or method == "server/discover" or _has_modern_envelope(params): + return await serve_modern(dctx, method, params) + result = await loop_runner.on_request(dctx, method, params) + if method == "initialize": + # Lock only on success: a failed handshake leaves both eras open. + era = "legacy" + return result + + async def on_notify(dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None) -> None: + if era != "modern": + return await loop_runner.on_notify(dctx, method, params) + # The envelope is request-only, so notifications inherit the + # connection's locked version. + connection = Connection.from_envelope(modern_version, None, None, outbound=standalone_outbound) + notify_runner = ServerRunner(server, connection, lifespan_state) + try: + await notify_runner.on_notify(_NoServerRequestsDispatchContext(dctx), method, params) + finally: + await aclose_shielded(connection) + + try: + await dispatcher.run(on_request, on_notify) + finally: + await aclose_shielded(loop_connection) + + async def serve_one( server: Server[LifespanT], dctx: DispatchContext[TransportContext], diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 820478f3f..5b4cc5478 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -408,8 +408,7 @@ async def check_context() -> str: async def test_client_auto_mode_probes_discover_then_adopts(simple_server: Server) -> None: """`mode='auto'` over an in-process HTTP transport: the `server/discover` probe reaches the modern entry and the negotiated protocol version is adopted without - an `initialize` handshake. Runs over HTTP because the in-memory runner gates - `server/discover` behind the init handshake.""" + an `initialize` handshake.""" with anyio.fail_after(5): async with ( mounted_app(simple_server) as (http, _), @@ -419,6 +418,46 @@ async def test_client_auto_mode_probes_discover_then_adopts(simple_server: Serve assert (await client.list_resources()).resources[0].name == "Test Resource" +@asynccontextmanager +async def _stream_loop_transport(server: Server) -> AsyncIterator[TransportStreams]: + """A Transport whose far end is `Server.run` over crossed memory streams - the stdio shape, in process.""" + async with ( + create_client_server_memory_streams() as ((client_read, client_write), (server_read, server_write)), + anyio.create_task_group() as tg, + ): + tg.start_soon(server.run, server_read, server_write, server.create_initialization_options()) + yield client_read, client_write + tg.cancel_scope.cancel() + + +async def test_client_auto_mode_negotiates_modern_over_a_stream_loop(simple_server: Server) -> None: + """`mode='auto'` against a real `Server.run` stream loop: the probe reaches the + dual-era driver, the connection locks modern, and feature requests are served + at 2026-07-28 with no `initialize` handshake.""" + with anyio.fail_after(5): + async with Client(_stream_loop_transport(simple_server), mode="auto") as client: + assert client.protocol_version == "2026-07-28" + assert (await client.list_resources()).resources[0].name == "Test Resource" + + +async def test_client_pinned_modern_mode_works_over_a_stream_loop(simple_server: Server) -> None: + """A pinned-modern client sends no probe: its first envelope-bearing request + locks the stream-loop connection modern and is served.""" + with anyio.fail_after(5): + async with Client(_stream_loop_transport(simple_server), mode="2026-07-28") as client: + assert client.protocol_version == "2026-07-28" + assert (await client.list_resources()).resources[0].name == "Test Resource" + + +async def test_client_legacy_mode_still_handshakes_over_a_stream_loop(simple_server: Server) -> None: + """`mode='legacy'` against the dual-era stream loop is byte-identical legacy: + the handshake runs and the session lands at a handshake-era version.""" + with anyio.fail_after(5): + async with Client(_stream_loop_transport(simple_server), mode="legacy") as client: + assert client.protocol_version == LATEST_HANDSHAKE_VERSION + assert (await client.list_resources()).resources[0].name == "Test Resource" + + @pytest.mark.parametrize("code", [types.METHOD_NOT_FOUND, types.REQUEST_TIMEOUT, types.INTERNAL_ERROR]) async def test_client_auto_mode_falls_back_to_initialize_on_legacy_signal(code: int) -> None: """`mode='auto'`: any JSON-RPC error from `server/discover` makes diff --git a/tests/server/test_runner.py b/tests/server/test_runner.py index 920015845..8281e8897 100644 --- a/tests/server/test_runner.py +++ b/tests/server/test_runner.py @@ -17,10 +17,15 @@ import anyio.abc import pytest from mcp_types import ( + CLIENT_CAPABILITIES_META_KEY, + CLIENT_INFO_META_KEY, INTERNAL_ERROR, INVALID_PARAMS, + INVALID_REQUEST, LATEST_PROTOCOL_VERSION, METHOD_NOT_FOUND, + PROTOCOL_VERSION_META_KEY, + UNSUPPORTED_PROTOCOL_VERSION, ClientCapabilities, ErrorData, Implementation, @@ -33,26 +38,35 @@ SetLevelRequestParams, Tool, ) -from mcp_types.version import LATEST_HANDSHAKE_VERSION, LATEST_MODERN_VERSION, OLDEST_SUPPORTED_VERSION +from mcp_types.version import ( + LATEST_HANDSHAKE_VERSION, + LATEST_MODERN_VERSION, + MODERN_PROTOCOL_VERSIONS, + OLDEST_SUPPORTED_VERSION, +) import mcp.server.runner from mcp.server.caching import CacheHint -from mcp.server.connection import Connection +from mcp.server.connection import Connection, NotifyOnlyOutbound from mcp.server.context import ServerRequestContext from mcp.server.lowlevel.server import NotificationOptions, Server from mcp.server.models import InitializationOptions from mcp.server.runner import ( ServerRunner, _extract_meta, + _has_modern_envelope, + _initialize_after_modern_data, + _NoServerRequestsDispatchContext, aclose_shielded, serve_connection, + serve_dual_era_loop, serve_one, ) from mcp.server.session import ServerSession from mcp.shared.dispatcher import CallOptions -from mcp.shared.exceptions import MCPError +from mcp.shared.exceptions import MCPError, NoBackChannelError from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher -from mcp.shared.message import MessageMetadata +from mcp.shared.message import MessageMetadata, SessionMessage from mcp.shared.peer import dump_params from mcp.shared.transport_context import TransportContext @@ -1221,3 +1235,322 @@ async def test_serve_connection_drives_dispatcher_loop_and_tears_down(server: Sr assert cleaned == [1] assert conn.protocol_version == LATEST_HANDSHAKE_VERSION assert conn.client_params is not None + + +# --- serve_dual_era_loop ---------------------------------------------------- + + +def _modern_envelope(version: str = LATEST_MODERN_VERSION) -> dict[str, Any]: + return { + PROTOCOL_VERSION_META_KEY: version, + CLIENT_INFO_META_KEY: {"name": "test-client", "version": "1.0"}, + CLIENT_CAPABILITIES_META_KEY: {}, + } + + +def _modern_params(version: str = LATEST_MODERN_VERSION, **params: Any) -> dict[str, Any]: + return {**params, "_meta": _modern_envelope(version)} + + +@asynccontextmanager +async def dual_era_client(server: SrvT) -> AsyncIterator[tuple[JSONRPCDispatcher[TransportContext], Recorder]]: + """Yield `(client, recorder)` speaking raw frames to a `serve_dual_era_loop` server. + + The driver owns its dispatcher and connection, so unlike `connected_runner` + the harness hands it bare streams and performs no handshake: each test + drives the era lock itself. + """ + c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32) + + def builder(_meta: object) -> TransportContext: + return TransportContext(kind="jsonrpc", can_send_request=True) + + client: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(s2c_recv, c2s_send, transport_builder=builder) + recorder = Recorder() + c_req, c_notify = echo_handlers(recorder) + body_exc: BaseException | None = None + async with anyio.create_task_group() as tg: + await tg.start(client.run, c_req, c_notify) + tg.start_soon(partial(serve_dual_era_loop, server, c2s_recv, s2c_send, lifespan_state=_LIFESPAN)) + try: + with anyio.fail_after(5): + yield client, recorder + except BaseException as e: + body_exc = e + tg.cancel_scope.cancel() + if body_exc is not None: + raise body_exc + + +@pytest.mark.anyio +async def test_dual_era_loop_discover_locks_modern_and_serves_envelope_requests(server: SrvT): + """`server/discover` over the loop returns a DiscoverResult and locks the + connection modern: envelope-bearing feature requests are then served + single-exchange, and a later legacy `initialize` is rejected with -32022 + naming the modern versions.""" + async with dual_era_client(server) as (client, _): + discover = await client.send_raw_request("server/discover", _modern_params()) + assert LATEST_MODERN_VERSION in discover["supportedVersions"] + assert "tools" in discover["capabilities"] + result = await client.send_raw_request("tools/list", _modern_params()) + assert result["tools"][0]["name"] == "t" + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("initialize", _initialize_params()) + assert exc_info.value.error.code == UNSUPPORTED_PROTOCOL_VERSION + assert exc_info.value.error.data == { + "supported": list(MODERN_PROTOCOL_VERSIONS), + "requested": LATEST_HANDSHAKE_VERSION, + } + + +@pytest.mark.anyio +async def test_dual_era_loop_pinned_modern_request_locks_without_a_probe(server: SrvT): + """A pinned-modern client sends no probe: its first envelope-bearing + feature request locks the connection modern directly.""" + async with dual_era_client(server) as (client, _): + result = await client.send_raw_request("tools/list", _modern_params()) + assert result["tools"][0]["name"] == "t" + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("initialize", _initialize_params()) + assert exc_info.value.error.code == UNSUPPORTED_PROTOCOL_VERSION + + +@pytest.mark.anyio +async def test_dual_era_loop_initialize_after_modern_lock_without_a_parseable_version(server: SrvT): + """An `initialize` with no string protocolVersion still gets the supported + list in the -32022 data (the typed payload needs a `requested` string).""" + async with dual_era_client(server) as (client, _): + await client.send_raw_request("server/discover", _modern_params()) + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("initialize", {"capabilities": {}}) + assert exc_info.value.error.code == UNSUPPORTED_PROTOCOL_VERSION + assert exc_info.value.error.data == {"supported": list(MODERN_PROTOCOL_VERSIONS)} + + +@pytest.mark.anyio +async def test_dual_era_loop_initialize_locks_legacy_and_rejects_modern_traffic(server: SrvT): + """After a successful handshake the connection is legacy for its lifetime: + `server/discover` and envelope-bearing requests are rejected with + INVALID_REQUEST while plain legacy requests keep working.""" + async with dual_era_client(server) as (client, _): + init = await client.send_raw_request("initialize", _initialize_params()) + assert init["protocolVersion"] == LATEST_HANDSHAKE_VERSION + with pytest.raises(MCPError) as discover_exc: + await client.send_raw_request("server/discover", _modern_params()) + with pytest.raises(MCPError) as envelope_exc: + await client.send_raw_request("tools/list", _modern_params()) + result = await client.send_raw_request("tools/list", None) + assert result["tools"][0]["name"] == "t" + assert discover_exc.value.error.code == INVALID_REQUEST + assert envelope_exc.value.error.code == INVALID_REQUEST + assert "locked to the legacy handshake era" in discover_exc.value.error.message + + +@pytest.mark.anyio +async def test_dual_era_loop_unsupported_modern_version_rejects_without_locking(server: SrvT): + """A probe at an unknown modern version gets -32022 with the supported + list, and the rejection does not lock the era: the legacy handshake still + succeeds afterwards (the released auto clients' retry/fallback contract).""" + async with dual_era_client(server) as (client, _): + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("server/discover", _modern_params(version="2099-01-01")) + init = await client.send_raw_request("initialize", _initialize_params()) + assert init["protocolVersion"] == LATEST_HANDSHAKE_VERSION + assert exc_info.value.error.code == UNSUPPORTED_PROTOCOL_VERSION + assert exc_info.value.error.data == { + "supported": list(MODERN_PROTOCOL_VERSIONS), + "requested": "2099-01-01", + } + + +@pytest.mark.anyio +async def test_dual_era_loop_bare_discover_rejects_without_locking(server: SrvT): + """A `server/discover` with no envelope triple is INVALID_PARAMS - never + -32022, so a released auto client's code-keyed fallback predicate takes the + legacy branch - and the connection can still complete the handshake.""" + async with dual_era_client(server) as (client, _): + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("server/discover", None) + init = await client.send_raw_request("initialize", _initialize_params()) + assert init["protocolVersion"] == LATEST_HANDSHAKE_VERSION + assert exc_info.value.error.code == INVALID_PARAMS + assert exc_info.value.error.code != UNSUPPORTED_PROTOCOL_VERSION + + +@pytest.mark.anyio +async def test_dual_era_loop_ping_before_any_lock_stays_exempt_and_neutral(server: SrvT): + """A pre-handshake `ping` is answered (the init-gate exemption) and does + not lock an era: the connection can still go modern.""" + async with dual_era_client(server) as (client, _): + assert await client.send_raw_request("ping", None) == {} + result = await client.send_raw_request("tools/list", _modern_params()) + assert result["tools"][0]["name"] == "t" + + +@pytest.mark.anyio +async def test_dual_era_loop_modern_request_without_envelope_rejects(server: SrvT): + """On a modern-locked connection every request is classified: one without + the envelope triple is INVALID_PARAMS.""" + async with dual_era_client(server) as (client, _): + await client.send_raw_request("server/discover", _modern_params()) + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("tools/list", None) + assert exc_info.value.error.code == INVALID_PARAMS + + +@pytest.mark.anyio +async def test_dual_era_loop_rejects_subscriptions_listen_on_modern(server: SrvT): + """`subscriptions/listen` is rejected before dispatch on the stream-pair + modern path: the registered handler assumes the HTTP entry's stream + semantics.""" + async with dual_era_client(server) as (client, _): + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("subscriptions/listen", _modern_params()) + assert exc_info.value.error.code == METHOD_NOT_FOUND + assert "not served over this transport" in exc_info.value.error.message + + +@pytest.mark.anyio +async def test_dual_era_loop_modern_notification_dispatches_at_locked_version(server: SrvT): + """Notifications carry no envelope, so on a modern-locked connection they + dispatch with the locked protocol version.""" + seen_versions: list[str] = [] + handled = anyio.Event() + + async def on_custom(ctx: Ctx, params: NotificationParams | None) -> None: + seen_versions.append(ctx.protocol_version) + handled.set() + + server.add_notification_handler("notifications/custom", NotificationParams, on_custom) + async with dual_era_client(server) as (client, _): + await client.send_raw_request("server/discover", _modern_params()) + await client.notify("notifications/custom", None) + await handled.wait() + assert seen_versions == [LATEST_MODERN_VERSION] + + +@pytest.mark.anyio +async def test_dual_era_loop_legacy_notifications_reach_the_loop_runner(server: SrvT): + """Before/after a legacy lock, notifications flow through the loop runner + exactly as under `serve_loop`.""" + handled = anyio.Event() + + async def on_custom(ctx: Ctx, params: NotificationParams | None) -> None: + handled.set() + + server.add_notification_handler("notifications/custom", NotificationParams, on_custom) + async with dual_era_client(server) as (client, _): + await client.send_raw_request("initialize", _initialize_params()) + await client.notify("notifications/custom", None) + await handled.wait() + + +@pytest.mark.anyio +async def test_dual_era_loop_modern_server_notifications_ride_the_pipe(server: SrvT): + """A modern handler's standalone notification reaches the client over the + duplex stream - the notify-only outbound forwards it.""" + + async def emit(ctx: Ctx, params: RequestParams | None) -> dict[str, Any]: + await ctx.session.send_tool_list_changed() + return {} + + server.add_request_handler("x/emit", RequestParams, emit) + async with dual_era_client(server) as (client, recorder): + await client.send_raw_request("x/emit", _modern_params()) + await recorder.notified.wait() + assert recorder.notifications[0][0] == "notifications/tools/list_changed" + + +@pytest.mark.anyio +async def test_dual_era_loop_modern_refuses_server_initiated_requests(server: SrvT): + """A modern handler attempting a server-initiated request gets + `NoBackChannelError` from the standalone channel: the modern protocol + forbids the frame, duplex pipe or not.""" + + async def wants_roots(ctx: Ctx, params: RequestParams | None) -> dict[str, Any]: + await ctx.session.list_roots() # pyright: ignore[reportDeprecated] + return {} # pragma: no cover - list_roots raises + + server.add_request_handler("x/roots", RequestParams, wants_roots) + async with dual_era_client(server) as (client, _): + with pytest.raises(MCPError) as exc_info: + await client.send_raw_request("x/roots", _modern_params()) + assert exc_info.value.error.code == INVALID_REQUEST + assert "no back-channel" in exc_info.value.error.message + + +def test_has_modern_envelope_requires_the_full_key_triple(): + assert not _has_modern_envelope(None) + assert not _has_modern_envelope({}) + assert not _has_modern_envelope({"_meta": None}) + assert not _has_modern_envelope({"_meta": {"progressToken": 1}}) + partial_meta = {k: v for k, v in _modern_envelope().items() if k != CLIENT_CAPABILITIES_META_KEY} + assert not _has_modern_envelope({"_meta": partial_meta}) + assert _has_modern_envelope(_modern_params()) + + +def test_initialize_after_modern_data_arms(): + typed = _initialize_after_modern_data({"protocolVersion": LATEST_HANDSHAKE_VERSION}) + assert typed == {"supported": list(MODERN_PROTOCOL_VERSIONS), "requested": LATEST_HANDSHAKE_VERSION} + assert _initialize_after_modern_data(None) == {"supported": list(MODERN_PROTOCOL_VERSIONS)} + assert _initialize_after_modern_data({"protocolVersion": 7}) == {"supported": list(MODERN_PROTOCOL_VERSIONS)} + + +class _RecordingInnerDctx: + """Minimal `DispatchContext` double recording delegated calls.""" + + def __init__(self) -> None: + self.transport = TransportContext(kind="jsonrpc", can_send_request=True) + self.can_send_request = True + self.request_id = 7 + self.message_metadata = None + self.cancel_requested = anyio.Event() + self.notifies: list[str] = [] + self.progresses: list[float] = [] + + async def send_raw_request( + self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None + ) -> dict[str, Any]: + raise AssertionError("must never be reached through the denying wrapper") # pragma: no cover + + async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None: + self.notifies.append(method) + + async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None: + self.progresses.append(progress) + + +@pytest.mark.anyio +async def test_no_server_requests_dispatch_context_denies_requests_and_delegates_the_rest(): + inner = _RecordingInnerDctx() + wrapper = _NoServerRequestsDispatchContext(inner) + assert wrapper.can_send_request is False + assert wrapper.transport is inner.transport + assert wrapper.request_id == 7 + assert wrapper.message_metadata is None + assert wrapper.cancel_requested is inner.cancel_requested + with pytest.raises(NoBackChannelError): + await wrapper.send_raw_request("roots/list", None) + await wrapper.notify("notifications/progress", None) + await wrapper.progress(0.5) + assert inner.notifies == ["notifications/progress"] + assert inner.progresses == [0.5] + + +@pytest.mark.anyio +async def test_notify_only_outbound_forwards_notifications_and_refuses_requests(): + inner = _RecordingInnerDctx() + outbound = NotifyOnlyOutbound(inner) + await outbound.notify("notifications/tools/list_changed", None) + assert inner.notifies == ["notifications/tools/list_changed"] + with pytest.raises(NoBackChannelError): + await outbound.send_raw_request("ping", None) + + +@pytest.mark.anyio +async def test_dual_era_client_propagates_body_exception_unwrapped(server: SrvT): + """The harness re-raises body exceptions as-is, not as `ExceptionGroup`.""" + with pytest.raises(RuntimeError, match="boom"): + async with dual_era_client(server): + raise RuntimeError("boom") diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 886bc51b5..f0c8b1c29 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -7,7 +7,15 @@ import anyio import pytest -from mcp_types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter +from mcp_types import ( + CLIENT_CAPABILITIES_META_KEY, + CLIENT_INFO_META_KEY, + PROTOCOL_VERSION_META_KEY, + JSONRPCMessage, + JSONRPCRequest, + JSONRPCResponse, + jsonrpc_message_adapter, +) from mcp.server.mcpserver import MCPServer from mcp.server.stdio import stdio_server @@ -96,32 +104,108 @@ async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch) -> Non assert second.message == valid -class _KeepOpenBytesIO(io.BytesIO): - """A BytesIO that survives its TextIOWrapper being closed. +class _GatedStdin(io.RawIOBase): + """Raw stdin double: serves its frames, then blocks until released before EOF. - Lets the test read what was written after `run()` has torn the wrapper down. + A real stdio client keeps stdin open until it has read the responses it is + awaiting; an immediate EOF after the last frame races the dispatcher's + EOF-time cancellation of in-flight handlers (only inline-handled methods + would deterministically answer first). The blocked read sits in + `stdio_server`'s reader worker thread and unblocks on `release()`. """ + name = "" + + def __init__(self, payload: bytes) -> None: + self._pending = payload + self._released = threading.Event() + + def readable(self) -> bool: + return True + + def readinto(self, b: bytearray | memoryview) -> int: # pyright: ignore[reportIncompatibleMethodOverride] + if self._pending: + n = min(len(b), len(self._pending)) + b[:n] = self._pending[:n] + self._pending = self._pending[n:] + return n + # A missed release falls through to EOF after the bound; the caller's + # own response assertions then report what actually arrived. + self._released.wait(5) + return 0 + + def release(self) -> None: + self._released.set() + + +class _NotifyingStdout(io.RawIOBase): + """Raw stdout double that counts newline-terminated lines and can be awaited on. + + Survives wrapper close (`close()` is a no-op) so the test can read what was + written after `run()` has torn its TextIOWrapper down. + """ + + name = "" + + def __init__(self) -> None: + self._chunks: list[bytes] = [] + self._lines = 0 + self._cond = threading.Condition() + + def writable(self) -> bool: + return True + + def write(self, b: bytes | bytearray | memoryview) -> int: # pyright: ignore[reportIncompatibleMethodOverride] + data = bytes(b) + with self._cond: + self._chunks.append(data) + self._lines += data.count(b"\n") + self._cond.notify_all() + return len(data) + + def wait_for_lines(self, n: int, timeout: float = 5) -> bool: + with self._cond: + return self._cond.wait_for(lambda: self._lines >= n, timeout) + + def getvalue(self) -> bytes: + with self._cond: + return b"".join(self._chunks) + def close(self) -> None: pass -def _run_stdio_bounded(server: MCPServer) -> None: - """Run the blocking `server.run("stdio")` in a daemon thread joined with a 5s bound. +def _serve_stdio_and_collect( + monkeypatch: pytest.MonkeyPatch, server: MCPServer, frames: list[JSONRPCRequest], responses: int +) -> list[JSONRPCMessage]: + """Serve `frames` over process stdio and return the parsed response lines. - `run()` creates its own event loop, so a sync test cannot arm `anyio.fail_after`; - the join timeout turns a run loop that never returns on stdin EOF into a red test - instead of a silent CI hang. An exception escaping `run()` still fails the test: - pytest's unhandled-thread warning is escalated by `filterwarnings = ["error"]`. + Runs the blocking `server.run("stdio")` in a daemon thread (it creates its + own event loop, so a sync test cannot arm `anyio.fail_after`) and signals + stdin EOF only after `responses` lines arrive on stdout - the way a real + client closes the pipe - so spawned in-flight handlers never race the + dispatcher's EOF cancellation. The join bound turns a run loop that never + returns on stdin EOF into a red test instead of a silent CI hang; an + exception escaping `run()` still fails the test via pytest's + unhandled-thread warning, escalated by `filterwarnings = ["error"]`. """ + payload = "".join(f.model_dump_json(by_alias=True, exclude_none=True) + "\n" for f in frames).encode() + stdin = _GatedStdin(payload) + stdout = _NotifyingStdout() + monkeypatch.setattr(sys, "stdin", TextIOWrapper(stdin, encoding="utf-8")) + monkeypatch.setattr(sys, "stdout", TextIOWrapper(stdout, encoding="utf-8")) def target() -> None: server.run("stdio") thread = threading.Thread(target=target, daemon=True) thread.start() + arrived = stdout.wait_for_lines(responses) + stdin.release() thread.join(5) assert not thread.is_alive(), 'run("stdio") did not return after stdin EOF' + assert arrived, f"expected {responses} response line(s); stdout carried: {stdout.getvalue()!r}" + return [jsonrpc_message_adapter.validate_json(line) for line in stdout.getvalue().decode().splitlines()] def test_mcpserver_run_stdio_serves_until_stdin_closes(monkeypatch: pytest.MonkeyPatch) -> None: @@ -131,15 +215,10 @@ def test_mcpserver_run_stdio_serves_until_stdin_closes(monkeypatch: pytest.Monke rather than serving forever. """ ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") - stdin_bytes = io.BytesIO(ping.model_dump_json(by_alias=True, exclude_none=True).encode() + b"\n") - captured = _KeepOpenBytesIO() - monkeypatch.setattr(sys, "stdin", TextIOWrapper(stdin_bytes, encoding="utf-8")) - monkeypatch.setattr(sys, "stdout", TextIOWrapper(captured, encoding="utf-8")) - _run_stdio_bounded(MCPServer(name="RunStdioServer")) + responses = _serve_stdio_and_collect(monkeypatch, MCPServer(name="RunStdioServer"), [ping], 1) - response = jsonrpc_message_adapter.validate_json(captured.getvalue().decode().strip()) - assert response == JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + assert responses == [JSONRPCResponse(jsonrpc="2.0", id=1, result={})] def test_mcpserver_run_stdio_runs_lifespan_cleanup_after_stdin_closes(monkeypatch: pytest.MonkeyPatch) -> None: @@ -159,13 +238,37 @@ async def lifespan(server: MCPServer) -> AsyncIterator[None]: events.append("cleanup") ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") - stdin_bytes = io.BytesIO(ping.model_dump_json(by_alias=True, exclude_none=True).encode() + b"\n") - captured = _KeepOpenBytesIO() - monkeypatch.setattr(sys, "stdin", TextIOWrapper(stdin_bytes, encoding="utf-8")) - monkeypatch.setattr(sys, "stdout", TextIOWrapper(captured, encoding="utf-8")) - _run_stdio_bounded(MCPServer(name="LifespanStdioServer", lifespan=lifespan)) + server = MCPServer(name="LifespanStdioServer", lifespan=lifespan) + responses = _serve_stdio_and_collect(monkeypatch, server, [ping], 1) assert events == ["setup", "cleanup"] - response = jsonrpc_message_adapter.validate_json(captured.getvalue().decode().strip()) - assert response == JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + assert responses == [JSONRPCResponse(jsonrpc="2.0", id=1, result={})] + + +def test_mcpserver_run_stdio_serves_a_modern_connection(monkeypatch: pytest.MonkeyPatch) -> None: + """`MCPServer.run("stdio")` serves the modern era over process stdio. + + A `server/discover` probe gets a DiscoverResult (no initialize handshake) + and a subsequent envelope-bearing request is served at the discovered + version - the wire exchange `Client(mode='auto')` drives against a stdio + server. + """ + envelope = { + PROTOCOL_VERSION_META_KEY: "2026-07-28", + CLIENT_INFO_META_KEY: {"name": "probe", "version": "1.0"}, + CLIENT_CAPABILITIES_META_KEY: {}, + } + discover = JSONRPCRequest(jsonrpc="2.0", id=1, method="server/discover", params={"_meta": envelope}) + tools = JSONRPCRequest(jsonrpc="2.0", id=2, method="tools/list", params={"_meta": envelope}) + + responses = _serve_stdio_and_collect(monkeypatch, MCPServer(name="ModernStdioServer"), [discover, tools], 2) + + assert isinstance(responses[0], JSONRPCResponse) and responses[0].id == 1 + assert "2026-07-28" in responses[0].result["supportedVersions"] + assert responses[0].result["serverInfo"]["name"] == "ModernStdioServer" + assert isinstance(responses[1], JSONRPCResponse) and responses[1].id == 2 + # `resultType` is the modern-only wire field: its presence proves the + # request was served at the discovered version, not the handshake era. + assert responses[1].result["tools"] == [] + assert responses[1].result["resultType"] == "complete"