Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/mcp/server/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,30 @@ async def notify(self, method: str, params: Mapping[str, Any] | None, opts: Call
_NO_CHANNEL = _NoChannelOutbound()


class NotifyOnlyOutbound:
"""Connection-scoped `Outbound` that forwards notifications and refuses requests.

Installed by `serve_dual_era_loop` for modern (2026-07-28+) connections
over duplex stream transports: the pipe is real, so server notifications
ride it, but the modern protocol forbids server-initiated JSON-RPC
requests, so `send_raw_request` refuses by construction.
"""

def __init__(self, outbound: Outbound) -> None:
self._outbound = outbound

async def send_raw_request(
self,
method: str,
params: Mapping[str, Any] | None,
opts: CallOptions | None = None,
) -> dict[str, Any]:
raise NoBackChannelError(method)

async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
await self._outbound.notify(method, params, opts)


class Connection:
"""Per-client connection state and standalone-stream `Outbound`.

Expand Down Expand Up @@ -167,7 +191,8 @@ def from_envelope(
both supplied) are recorded as `client_params` so capability checks
work. `outbound` defaults to the no-channel sentinel for the
single-exchange HTTP path; duplex modern transports (e.g. stdio) pass
the dispatcher so server-initiated messages have a back-channel.
a notify-only wrapper around the dispatcher so server notifications
ride the pipe while server-initiated requests stay refused.
"""
client_params = None
if client_info is not None and client_capabilities is not None:
Expand Down
12 changes: 7 additions & 5 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def main():
from mcp.server.caching import CacheableMethod, CacheHint, validate_cache_hints
from mcp.server.context import HandlerResult, ServerMiddleware, ServerRequestContext
from mcp.server.models import InitializationOptions
from mcp.server.runner import serve_loop
from mcp.server.runner import serve_dual_era_loop
from mcp.server.streamable_http import EventStore
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
from mcp.server.transport_security import TransportSecuritySettings
Expand Down Expand Up @@ -689,12 +689,14 @@ async def run(
) -> None:
"""Serve a single connection over the given streams until the read side closes.

Thin wrapper over `serve_loop`: enters the server lifespan,
then drives the loop. Transports with their own lifespan owner
(the streamable-HTTP manager) call `serve_loop` directly instead.
Thin wrapper over `serve_dual_era_loop`: enters the server lifespan,
then drives the loop, serving the legacy handshake era and the modern
per-request-envelope era (the first era-distinctive message locks the
connection). Transports with their own lifespan owner (the
streamable-HTTP manager) call `serve_loop` directly instead.
"""
async with self.lifespan(self) as lifespan_context:
await serve_loop(
await serve_dual_era_loop(
self,
read_stream,
write_stream,
Expand Down
227 changes: 219 additions & 8 deletions src/mcp/server/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
pure kernel: it holds a pre-populated `Connection` and reads
`connection.protocol_version` / `connection.outbound` as facts. Driving a
dispatcher loop and tearing down the connection live in the free-function
drivers (`serve_connection`, `serve_loop`, `serve_one`); the entry constructs
the `Connection`, the driver tears it down.
drivers (`serve_connection`, `serve_loop`, `serve_dual_era_loop`, `serve_one`);
the entry constructs the `Connection`, the driver tears it down.

`ServerRunner` holds a `Server` directly - `Server` is the registry.
"""
Expand All @@ -17,7 +17,7 @@
from collections.abc import Awaitable, Mapping
from dataclasses import KW_ONLY, dataclass
from functools import cached_property, partial
from typing import TYPE_CHECKING, Any, Generic, cast
from typing import TYPE_CHECKING, Any, Generic, Literal, cast

import anyio
import anyio.abc
Expand All @@ -26,31 +26,41 @@
CLIENT_INFO_META_KEY,
INTERNAL_ERROR,
INVALID_PARAMS,
INVALID_REQUEST,
METHOD_NOT_FOUND,
PROTOCOL_VERSION_META_KEY,
UNSUPPORTED_PROTOCOL_VERSION,
CacheableResult,
ErrorData,
Implementation,
InitializeRequestParams,
InitializeResult,
RequestId,
RequestParams,
RequestParamsMeta,
UnsupportedProtocolVersionErrorData,
)
from mcp_types import methods as _methods
from mcp_types.version import HANDSHAKE_PROTOCOL_VERSIONS, LATEST_HANDSHAKE_VERSION, LATEST_MODERN_VERSION
from mcp_types.version import (
HANDSHAKE_PROTOCOL_VERSIONS,
LATEST_HANDSHAKE_VERSION,
LATEST_MODERN_VERSION,
MODERN_PROTOCOL_VERSIONS,
)
from pydantic import BaseModel, ValidationError
from typing_extensions import TypeVar

from mcp.server.caching import apply_cache_hint
from mcp.server.connection import Connection
from mcp.server.connection import Connection, NotifyOnlyOutbound
from mcp.server.context import CallNext, HandlerResult, ServerMiddleware, ServerRequestContext
from mcp.server.models import InitializationOptions
from mcp.server.session import ServerSession
from mcp.shared._stream_protocols import ReadStream, WriteStream
from mcp.shared.dispatcher import DispatchContext, Dispatcher, OnNotify, OnRequest
from mcp.shared.exceptions import MCPError
from mcp.shared.dispatcher import CallOptions, DispatchContext, Dispatcher, OnNotify, OnRequest
from mcp.shared.exceptions import MCPError, NoBackChannelError
from mcp.shared.inbound import InboundLadderRejection, classify_inbound_request
from mcp.shared.jsonrpc_dispatcher import JSONRPCDispatcher
from mcp.shared.message import ServerMessageMetadata, SessionMessage
from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage
from mcp.shared.transport_context import TransportContext

if TYPE_CHECKING:
Expand All @@ -63,6 +73,7 @@
"aclose_shielded",
"modern_on_request",
"serve_connection",
"serve_dual_era_loop",
"serve_loop",
"serve_one",
]
Expand Down Expand Up @@ -427,6 +438,206 @@ async def serve_loop(
)


_MODERN_ENVELOPE_KEYS = (PROTOCOL_VERSION_META_KEY, CLIENT_INFO_META_KEY, CLIENT_CAPABILITIES_META_KEY)


def _has_modern_envelope(params: Mapping[str, Any] | None) -> bool:
"""Whether `params._meta` carries every reserved modern-envelope key.

Era evidence is the FULL key triple - bare `_meta` is not (legacy traffic
carries `progressToken` there).
"""
if not params:
return False
meta = params.get("_meta")
return isinstance(meta, Mapping) and all(key in meta for key in _MODERN_ENVELOPE_KEYS)


def _initialize_after_modern_data(params: Mapping[str, Any] | None) -> dict[str, Any]:
"""Error data for an `initialize` arriving on a modern-locked connection.

The typed -32022 payload when the client's proposed version is parseable;
otherwise just the supported list (the point is naming what we serve).
"""
requested = (params or {}).get("protocolVersion")
if isinstance(requested, str):
return UnsupportedProtocolVersionErrorData(
supported=list(MODERN_PROTOCOL_VERSIONS), requested=requested
).model_dump(mode="json")
return {"supported": list(MODERN_PROTOCOL_VERSIONS)}


@dataclass
class _NoServerRequestsDispatchContext:
"""Delegating `DispatchContext` that refuses server-initiated requests.

Wraps the loop dispatcher's per-message context for modern-era dispatch:
the modern protocol forbids server-initiated JSON-RPC requests, so
`send_raw_request` refuses while notifications and progress still ride
the duplex pipe.
"""

_inner: DispatchContext[TransportContext]

@property
def transport(self) -> TransportContext:
return self._inner.transport

@property
def can_send_request(self) -> bool:
return False

@property
def request_id(self) -> RequestId | None:
return self._inner.request_id

@property
def message_metadata(self) -> MessageMetadata:
return self._inner.message_metadata

@property
def cancel_requested(self) -> anyio.Event:
return self._inner.cancel_requested

async def send_raw_request(
self,
method: str,
params: Mapping[str, Any] | None,
opts: CallOptions | None = None,
) -> dict[str, Any]:
raise NoBackChannelError(method)

async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
await self._inner.notify(method, params, opts)

async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
await self._inner.progress(progress, total, message)


async def serve_dual_era_loop(
server: Server[LifespanT],
read_stream: ReadStream[SessionMessage | Exception],
write_stream: WriteStream[SessionMessage],
*,
lifespan_state: LifespanT,
session_id: str | None = None,
init_options: InitializationOptions | None = None,
raise_exceptions: bool = False,
) -> None:
"""Drive `server` over a duplex stream pair, serving both protocol eras.

The stream-pair counterpart of the modern HTTP entry's era router. Era is
a property of the connection, decided by how the client opens it, and
mid-stream switching is undefined - so the first era-distinctive message
locks the connection (matching the typescript-sdk):

- `initialize` locks legacy: the connection behaves exactly like
`serve_loop` for its lifetime, and modern envelope traffic is rejected
with INVALID_REQUEST.
- A request carrying the modern `_meta` envelope triple - or
`server/discover`, a modern-only method - locks modern: every request is
classified (`classify_inbound_request`) and served single-exchange via
`serve_one` with a born-ready per-request `Connection`, the same
dispatch model as the modern HTTP entry. A later `initialize` is
rejected with UNSUPPORTED_PROTOCOL_VERSION naming the modern versions.

Modern connections push notifications over the duplex pipe but refuse
server-initiated requests on both channels (the modern protocol forbids
them). A rejected classification (malformed envelope, unsupported version)
never locks the era, so a failed probe leaves the legacy handshake
available - released auto-negotiating clients fall back on any error code
except -32022.
"""
dispatcher: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(
read_stream,
write_stream,
raise_handler_exceptions=raise_exceptions,
# `initialize` inline for the same pipelining reason as `serve_loop`;
# `server/discover` inline so the modern era lock commits before the
# next pipelined message is read.
inline_methods=frozenset({"initialize", "server/discover"}),
)
loop_connection = Connection.for_loop(dispatcher, session_id=session_id)
loop_runner = ServerRunner(server, loop_connection, lifespan_state, init_options=init_options)
standalone_outbound = NotifyOnlyOutbound(dispatcher)
era: Literal["unlocked", "legacy", "modern"] = "unlocked"
modern_version = LATEST_MODERN_VERSION

async def serve_modern(
dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
) -> dict[str, Any]:
nonlocal era, modern_version
route = classify_inbound_request({"method": method, "params": params})
if isinstance(route, InboundLadderRejection):
raise MCPError(code=route.code, message=route.message, data=route.data)
if era != "modern":
era, modern_version = "modern", route.protocol_version
if method == "subscriptions/listen":
# The registered listen handler assumes the HTTP entry's stream
# semantics; served over a stream pair it would wedge. Reject until
# this transport grows its own listen design.
raise MCPError(
code=METHOD_NOT_FOUND, message="subscriptions/listen is not served over this transport", data=method
)
connection = Connection.from_envelope(
route.protocol_version,
route.client_info,
route.client_capabilities,
outbound=standalone_outbound,
)
return await serve_one(
server,
_NoServerRequestsDispatchContext(dctx),
method,
params,
connection=connection,
lifespan_state=lifespan_state,
)

async def on_request(
dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
) -> dict[str, Any]:
nonlocal era
if era == "legacy":
if method == "server/discover" or _has_modern_envelope(params):
raise MCPError(
code=INVALID_REQUEST,
message="connection is locked to the legacy handshake era; "
"modern envelope requests are not accepted",
)
return await loop_runner.on_request(dctx, method, params)
if era == "modern" and method == "initialize":
raise MCPError(
code=UNSUPPORTED_PROTOCOL_VERSION,
message="connection already negotiated a modern protocol version",
data=_initialize_after_modern_data(params),
)
if era == "modern" or method == "server/discover" or _has_modern_envelope(params):
return await serve_modern(dctx, method, params)
result = await loop_runner.on_request(dctx, method, params)
if method == "initialize":
# Lock only on success: a failed handshake leaves both eras open.
era = "legacy"
return result

async def on_notify(dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None) -> None:
if era != "modern":
return await loop_runner.on_notify(dctx, method, params)
# The envelope is request-only, so notifications inherit the
# connection's locked version.
connection = Connection.from_envelope(modern_version, None, None, outbound=standalone_outbound)
notify_runner = ServerRunner(server, connection, lifespan_state)
try:
await notify_runner.on_notify(_NoServerRequestsDispatchContext(dctx), method, params)
finally:
await aclose_shielded(connection)

try:
await dispatcher.run(on_request, on_notify)
finally:
await aclose_shielded(loop_connection)


async def serve_one(
server: Server[LifespanT],
dctx: DispatchContext[TransportContext],
Expand Down
Loading
Loading