Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/actions/conformance/expected-failures.2026-07-28.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,4 @@

client: []

server:
# SEP-2575 subscriptions/listen is not implemented yet; see the matching
# entry in expected-failures.yml for the full rationale.
- server-stateless
server: []
10 changes: 0 additions & 10 deletions .github/actions/conformance/expected-failures.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@
client: []

server:
# SEP-2575 subscriptions/listen is not implemented yet. The everything-
# server's legacy resources/subscribe handlers make it advertise
# `resources.subscribe` in server/discover, and as of conformance #372 a
# server that advertises a subscription capability but answers
# subscriptions/listen with -32601 fails the three listen MUST checks
# ("Not testable") instead of skipping them. Remove this entry when the
# listen runtime lands. NOTE: while listed, this entry also masks new
# failures in the scenario's other 25 (currently passing) checks — the
# baseline is per-scenario, not per-check.
- server-stateless
# SEP-2663 (io.modelcontextprotocol/tasks): the SDK does not implement the
# tasks extension yet. These extension-tagged scenarios are selected only by
# the bare `--suite all` leg — extension scenarios never match a
Expand Down
1 change: 1 addition & 0 deletions docs/advanced/low-level-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ Each of these is one idea you now have the vocabulary for; each has its own chap

* `on_call_tool`, `on_get_prompt`, and `on_read_resource` may return an `InputRequiredResult` instead of their normal result to pause the call and ask the client for input; see **[Multi-round-trip requests](multi-round-trip.md)**. True to this tier, nothing is installed for you: where `MCPServer` seals `requestState` by default, here the `request_state` you set crosses the wire exactly as written until you opt in with `server.middleware.append(RequestStateBoundary(RequestStateSecurity(keys=[...]), default_audience=server.name))`: one line (both names import from `mcp.server.request_state`) for the identical sealing and verification `MCPServer` performs (**[Protecting `requestState`](multi-round-trip.md#protecting-requeststate)**).
* `on_list_resources`, `on_read_resource`, `on_list_prompts`, `on_get_prompt`, `on_completion` are the same `(ctx, params) -> result` shape for the other primitives.
* `on_subscriptions_listen` serves the 2026-07-28 `subscriptions/listen` stream. Pass a `ListenHandler` built over a `SubscriptionBus` and publish events to the bus from your other handlers; see **[Subscriptions](subscriptions.md)** for the full composition.
* `server.streamable_http_app()` returns the same Starlette app `MCPServer`'s does; deploy it the way **[Running your server](../run/index.md)** deploys any other ASGI app. There is no `server.run(transport=...)` down here: `server.run(read_stream, write_stream, server.create_initialization_options())` drives one connection over a pair of streams, and that one line is the whole story.

## Recap
Expand Down
88 changes: 88 additions & 0 deletions docs/advanced/subscriptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Subscriptions

A server's catalog is not fixed. Tools get registered at runtime, resources change behind their URIs. The client side of that story is a subscription: on the 2026-07-28 protocol, a client that wants to hear about changes sends one `subscriptions/listen` request, and the response to that request *is* the stream — it stays open, carrying exactly the notification kinds the client asked for.

Your side of it is one line: publish the change.

```python title="server.py" hl_lines="16 27"
--8<-- "docs_src/subscriptions/tutorial001.py"
```

* `await ctx.notify_resource_updated("note://todo")` delivers `notifications/resources/updated` to every open listen stream that subscribed to that URI. Not to anyone else.
* `await ctx.notify_tools_changed()` delivers `notifications/tools/list_changed` to every stream that asked for tool-list changes. A client that receives it calls `tools/list` again — and now sees `search`.
* The siblings are `notify_prompts_changed()` and `notify_resources_changed()`, for the other two list-changed kinds.
* No subscribers, no work: publishing to an idle server is a no-op. You don't check whether anyone is listening; you state what changed.

The SDK serves `subscriptions/listen` for you — `MCPServer` registers the handler at construction, and the wire obligations (the acknowledgment as the first frame, the per-stream filtering, the subscription id tagged onto every frame) are its job, not yours.

!!! check
On the wire, a stream whose filter named `note://todo` looks like this after `edit_note` runs:

```json
{"method": "notifications/subscriptions/acknowledged",
"params": {"notifications": {"resourceSubscriptions": ["note://todo"]}, "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}}

{"method": "notifications/resources/updated",
"params": {"uri": "note://todo", "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}}
```

The acknowledgment echoes the filter the server agreed to honor, and every frame carries the
listen request's JSON-RPC id under `_meta` — that id *is* the subscription id.

## Only what was asked for

The filter is a contract. A stream that requested tool-list changes and one resource URI receives those two kinds and nothing else — publish a prompt change and that stream stays silent. Resource URIs are matched as exact strings: `note://todo` does not cover `note://todo/draft`.

!!! warning
Filters are honored without per-client authorization: any client may name any URI —
including one it cannot read — and will receive update notifications for it (resource
existence and change timing, never content). On a multi-tenant server, don't publish
sensitive per-user URIs through `notify_resource_updated`, or serve the method with
your own handler on the low-level `Server` and narrow the filter there before acking —
the honored subset exists in the protocol precisely so servers can do this.

Two more things the stream is *not*:

* **It is not a replay log.** A dropped stream is gone; events published while nobody was connected are not queued. The client's contract is to re-listen and re-fetch what it cares about.
* **It is not the 2025 path.** Clients on earlier protocol versions that called `resources/subscribe` are served by `ctx.session.send_resource_updated(uri)` — the `notify_*` methods reach `subscriptions/listen` streams only.

## One process is the default. More takes a bus

Publishes travel from your handler to the open streams over a `SubscriptionBus`. The default is in-memory: one process, every stream in it. That is the right answer until you run replicas behind a load balancer — then a client's stream is pinned to one replica, and a publish on another replica has to reach it.

That seam is yours to implement: two methods over your pub/sub backend.

```python
class RedisSubscriptionBus:
async def publish(self, event: ServerEvent) -> None:
await self.redis.publish("mcp-events", encode(event)) # to every replica

def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
... # register the local listener; a reader task calls it for arriving events
```

```python
mcp = MCPServer("Notebook", subscriptions=RedisSubscriptionBus(...))
```

The bus carries typed `ServerEvent` values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. To publish from outside a request, keep a reference to the bus you constructed and `await bus.publish(ToolsListChanged())` — the server holds the same instance.

## The low-level composition

Down on the low-level `Server` there is no pre-wired anything — and the same parts assemble in three lines:

```python title="server.py" hl_lines="9 31 39"
--8<-- "docs_src/subscriptions/tutorial002.py"
```

* You own the bus, so you publish to it directly: `await bus.publish(ResourceUpdated(uri=...))`. Put it wherever your handlers can reach it — module scope here, the lifespan in a bigger app.
* `ListenHandler(bus)` is the same handler `MCPServer` registers; `on_subscriptions_listen=` is an ordinary handler slot. Don't want the SDK's semantics? Write your own handler for the slot — the spec obligations come with it.
* `ListenHandler.close()` gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately — a clean end, as opposed to the abrupt drop a client may treat as a cue to reconnect. Without it, streams end when the client disconnects.

## Recap

* A client opts in with one `subscriptions/listen` request; the response is the stream. There is nothing to configure server-side — serving it is built in.
* You publish: `await ctx.notify_resource_updated(uri)`, `notify_tools_changed()`, `notify_prompts_changed()`, `notify_resources_changed()`. Idle servers make these free.
* Streams receive only what their filter requested; URIs match exactly; nothing is replayed.
* Scaling out means implementing `SubscriptionBus` — two methods — over your own pub/sub, and passing it as `MCPServer(subscriptions=...)`.
* The low-level spelling is the same machinery held in your hands: a bus, `ListenHandler(bus)`, one constructor argument.
2 changes: 1 addition & 1 deletion docs/client/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ The resource verbs come in pairs: two ways to list, one way to read.

`read_resource` returns `contents`, a list of `TextResourceContents` or `BlobResourceContents`. Same idea as tool content: narrow with `isinstance`, then read `.text` (or `.blob`).

A client can also **subscribe** to a resource and be told when it changes: `subscribe_resource(uri)` and `unsubscribe_resource(uri)`, same shape as everything else here. `MCPServer` doesn't implement that half. It says so up front (`server_capabilities.resources.subscribe` is `False`) and answers the request with an `MCPError`: `-32601`, *Method not found*. A server that does support subscriptions is built on the low-level `Server` (**[The low-level Server](../advanced/low-level-server.md)**).
A client can also be told when a resource changes. On 2025-era connections that is `subscribe_resource(uri)` / `unsubscribe_resource(uri)` - a method pair `MCPServer` doesn't implement, so on the 2026-07-28 wire (where those verbs no longer exist) the request answers `-32601`, *Method not found*. The 2026 replacement is a `subscriptions/listen` stream, which `MCPServer` *does* serve - `server_capabilities.resources.subscribe` is `True` there, and the server side of the story is **[Subscriptions](../advanced/subscriptions.md)**.

## Prompts

Expand Down
2 changes: 2 additions & 0 deletions docs/tutorial/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ What a server offers is not fixed at import time. Register a tool at runtime, th

The siblings are `send_resource_list_changed()`, `send_prompt_list_changed()`, and `send_resource_updated(uri)` for a change to one specific resource.

On a 2026-07-28 connection, clients receive change notifications only on a `subscriptions/listen` stream they opened — the `send_*` methods above do not reach those streams. The `Context` publish methods — `await ctx.notify_tools_changed()`, `await ctx.notify_prompts_changed()`, `await ctx.notify_resources_changed()`, and `await ctx.notify_resource_updated(uri)` — deliver to every subscribed stream at once. The whole story, including scaling out across replicas, is in **[Subscriptions](../advanced/subscriptions.md)**.

!!! check
Before anyone runs `enable_recommendations`, the tool you are promising does not exist. Call it
anyway and the result is an error the model can read:
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorial/first-steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ asyncio.run(main())
```

```text
{'prompts': {'list_changed': False}, 'resources': {'subscribe': False, 'list_changed': False}, 'tools': {'list_changed': False}}
{'prompts': {'list_changed': True}, 'resources': {'subscribe': True, 'list_changed': True}, 'tools': {'list_changed': True}}
```

That dictionary is the server's half of the handshake:
Expand Down
Empty file.
28 changes: 28 additions & 0 deletions docs_src/subscriptions/tutorial001.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from mcp.server.mcpserver import Context, MCPServer

mcp = MCPServer("Notebook")

NOTES = {"todo": "buy milk", "journal": "day one"}


@mcp.resource("note://{name}")
def note(name: str) -> str:
return NOTES[name]


@mcp.tool()
async def edit_note(name: str, text: str, ctx: Context) -> str:
NOTES[name] = text
await ctx.notify_resource_updated(f"note://{name}")
return "saved"


def search(query: str) -> list[str]:
return [name for name, text in NOTES.items() if query in text]


@mcp.tool()
async def enable_search(ctx: Context) -> str:
mcp.add_tool(search)
await ctx.notify_tools_changed()
return "search is live"
40 changes: 40 additions & 0 deletions docs_src/subscriptions/tutorial002.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import Any

import mcp_types as types

from mcp.server.context import ServerRequestContext
from mcp.server.lowlevel import Server
from mcp.server.subscriptions import InMemorySubscriptionBus, ListenHandler, ResourceUpdated

bus = InMemorySubscriptionBus()

NOTES = {"todo": "buy milk"}

EDIT_NOTE_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {"name": {"type": "string"}, "text": {"type": "string"}},
"required": ["name", "text"],
}


async def list_tools(
ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None
) -> types.ListToolsResult:
return types.ListToolsResult(
tools=[types.Tool(name="edit_note", description="Replace a note's text.", input_schema=EDIT_NOTE_SCHEMA)]
)


async def call_tool(ctx: ServerRequestContext[Any], params: types.CallToolRequestParams) -> types.CallToolResult:
args = params.arguments or {}
NOTES[args["name"]] = args["text"]
await bus.publish(ResourceUpdated(uri=f"note://{args['name']}"))
Comment thread
maxisbey marked this conversation as resolved.
return types.CallToolResult(content=[types.TextContent(type="text", text="saved")])


server = Server(
"notebook",
on_list_tools=list_tools,
on_call_tool=call_tool,
on_subscriptions_listen=ListenHandler(bus),
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import click
from mcp.server import ServerRequestContext
from mcp.server.mcpserver import Context, MCPServer, RequestStateSecurity
from mcp.server.mcpserver.prompts.base import UserMessage
from mcp.server.mcpserver.prompts.base import Prompt, UserMessage
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
from mcp.shared.exceptions import MCPError
from mcp_types import (
Expand Down Expand Up @@ -585,6 +585,34 @@ async def test_reconnection(ctx: Context) -> str:
return "Reconnection test completed"


def _dynamic_tool() -> str:
"""A tool registered and removed by test_trigger_tool_change."""
return "dynamic"


def _dynamic_prompt() -> str:
"""A prompt registered and removed by test_trigger_prompt_change."""
return "dynamic"


@mcp.tool()
async def test_trigger_tool_change(ctx: Context) -> str:
"""Mutates the tool list and announces it to subscriptions/listen streams (SEP-2575)"""
mcp.add_tool(_dynamic_tool, name="test_dynamic_tool")
mcp.remove_tool("test_dynamic_tool")
await ctx.notify_tools_changed()
return "tool list changed"


@mcp.tool()
async def test_trigger_prompt_change(ctx: Context) -> str:
"""Mutates the prompt list and announces it to subscriptions/listen streams (SEP-2575)"""
mcp.add_prompt(Prompt.from_function(_dynamic_prompt, name="test_dynamic_prompt", description="dynamic"))
mcp.remove_prompt("test_dynamic_prompt")
await ctx.notify_prompts_changed()
return "prompt list changed"


# Resources
@mcp.resource("test://static-text")
def static_text_resource() -> str:
Expand Down
2 changes: 1 addition & 1 deletion examples/stories/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ opens with a banner saying what replaces it.
| [`starlette_mount`](starlette_mount/) | mounting `streamable_http_app()` under a Starlette/FastAPI sub-path | current |
| [`sse_polling`](sse_polling/) | SEP-1699 `closeSSE()` + `Last-Event-ID` resume via `EventStore` | legacy |
| [`standalone_get`](standalone_get/) | server-initiated `list_changed` over the sessionful GET stream | legacy |
| [`subscriptions`](subscriptions/) | `subscriptions/listen` streams: `ctx.notify_*`, `SubscriptionBus`, `ListenHandler` | current |
| [`reconnect`](reconnect/) | explicit `discover()`, persist `DiscoverResult`, zero-RTT reconnect | current |
| [`bearer_auth`](bearer_auth/) | `TokenVerifier` + `AuthSettings` bearer gate, PRM metadata, `get_access_token()` | current |
| [`oauth`](oauth/) | full `authorization_code` grant against an in-process AS | current |
| [`oauth_client_credentials`](oauth_client_credentials/) | `client_credentials` grant; minimal in-process token endpoint | current |
| [`identity_assertion`](identity_assertion/) | SEP-990 enterprise IdP flow: present an ID-JAG under the `jwt-bearer` grant | current |
| **— deferred (README only) —** | | |
| [`caching`](caching/) | `CacheableResult` ttl/scope hints; client honouring | not yet implemented |
| [`subscriptions`](subscriptions/) | `subscriptions/listen`, `ServerEventBus`, `Client.listen()` | not yet implemented — [#2901](https://github.com/modelcontextprotocol/python-sdk/issues/2901) |
| [`tasks`](tasks/) | `io.modelcontextprotocol/tasks` extension | not yet implemented |
| [`apps`](apps/) | MCP Apps: `ui://` resource + `_meta.ui` | not yet implemented — [#2896](https://github.com/modelcontextprotocol/python-sdk/issues/2896) |
| [`skills`](skills/) | SEP-2640 skills extension | not yet implemented — [#2896](https://github.com/modelcontextprotocol/python-sdk/issues/2896) |
Expand Down
7 changes: 6 additions & 1 deletion examples/stories/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ lowlevel = false
transports = ["in-memory", "http-asgi"]
era = "dual-in-body"

[story.subscriptions]
# subscriptions/listen exists only on the 2026 wire, so there is no legacy leg.
# The listen request parks for the stream's lifetime; the client ends it by
# cancelling the awaiting scope (the spec's client-side close).
era = "modern"
Comment thread
maxisbey marked this conversation as resolved.

[story.schema_validators]

[story.middleware]
Expand Down Expand Up @@ -166,7 +172,6 @@ fixed_port = 8000 # issuer/PRM metadata bake in :8

[deferred]
caching = "client honouring + per-result override unlanded"
subscriptions = "#2901 — Client.listen / ServerEventBus"
tasks = "SEP-2663 — tasks extension runtime (server-decided augmentation, CreateTaskResult)"
skills = "#2896 — SEP-2640"
events = "#2901 + #2896"
Loading
Loading