Skip to content

Serve subscriptions/listen with a pluggable event bus (SEP-2575)#3035

Merged
maxisbey merged 7 commits into
mainfrom
subscriptions-listen
Jun 30, 2026
Merged

Serve subscriptions/listen with a pluggable event bus (SEP-2575)#3035
maxisbey merged 7 commits into
mainfrom
subscriptions-listen

Conversation

@maxisbey

@maxisbey maxisbey commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Implements server-side subscriptions/listen (SEP-2575) for the 2026-07-28 revision.

Note

This branch was previously a much larger change covering both sides of the protocol; it has been reset and rebuilt as a minimal server-side implementation. Client-side support will follow separately.

On the 2026-07-28 wire there is no standing GET stream: a client opts in to server events by sending a subscriptions/listen request whose response is the stream. This PR adds the serving half:

What's here

  • mcp/server/subscriptions.py — the whole runtime:
    • SubscriptionBus: a two-method protocol (async publish / sync subscribe) over four typed ServerEvent kinds. The bus carries events, not wire notifications — a custom implementation (e.g. Redis pub/sub for multi-replica deployments) never sees JSON-RPC. InMemorySubscriptionBus is the in-process default (raising listeners are isolated and logged).
    • ListenHandler: serves the method. Ack-first by construction (the stream subscribes before the ack is sent, so nothing published during the ack write is lost, and the ack is still the first frame), per-stream filter honoring (URIs matched via a set), every frame tagged with the listen request's JSON-RPC id under _meta["io.modelcontextprotocol/subscriptionId"], and close() initiates graceful closure (each stream flushes and ends with the stamped SubscriptionsListenResult). Bounded: max_subscriptions (rejected pre-ack past the cap) and max_buffered_events (a stream whose client stopped reading is ended at the cap; the client re-listens).
  • MCPServer: optional subscriptions= constructor parameter (defaults to the in-memory bus) and the handler registered automatically. Context gets the bus at construction and publishes through await ctx.notify_tools_changed() / notify_prompts_changed() / notify_resources_changed() / notify_resource_updated(uri).
  • Era-aware capabilities: get_capabilities() takes an optional protocol_version; at 2026-07-28+ the listChanged/resources.subscribe bits derive from whether subscriptions/listen is served — that is what those bits mean on a wire whose only delivery channel is the listen stream. The handshake-era derivation is byte-identical and remains the default; server/discover passes the request's version.
  • Lowlevel Server: composable, not pre-wired — bus in your lifespan state, handlers publish to it, ListenHandler (or your own handler) passed as on_subscriptions_listen=.
  • Docs: docs/advanced/subscriptions.md (claims proved by tests/docs_src/test_subscriptions.py) and a runnable examples/stories/subscriptions story (both transports, both server tiers).
  • No changes to ServerSession, dispatchers, client transports, or mcp-types.

Design notes

This mirrors the typescript-sdk's ServerEventBus + listen-router split: the SDK owns all wire semantics (ack ordering, filtering, id stamping, teardown, limits) so a buggy or external bus cannot violate the spec's MUSTs. Delivery is fire-and-forget with no replay — a dropped stream is not resumable; clients re-listen and refetch. json_response=True does not apply to listen: a listen response is a notification stream, so it always takes the SSE path (TypeScript/Go parity).

Conformance

server-stateless is 30/30: the three listen MUST checks (ack-first, subscription-id tagging, filter honoring) pass, and with the capability bits now advertised the two list-changed SHOULD checks run and pass against the everything-server's new test_trigger_*_change diagnostic tools. Draft and 2026-07-28 legs both pass their baselines; the server-stateless expected-failures entries are removed.

@maxisbey maxisbey marked this pull request as ready for review June 30, 2026 17:39
@github-actions

github-actions Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

📚 Documentation preview

Preview https://pr-3035.mcp-python-docs.pages.dev
Deployment https://de298aaf.mcp-python-docs.pages.dev
Commit 6eddb34
Triggered by @maxisbey
Updated 2026-06-30 21:59:08 UTC

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 issues found and verified against the latest diff

Tip: instead of fixing issues one by one fix them all with cubic

Re-trigger cubic

Comment thread examples/servers/everything-server/mcp_everything_server/server.py Outdated
Comment thread examples/servers/everything-server/mcp_everything_server/server.py Outdated
Comment thread src/mcp/client/subscriptions.py Outdated
Comment thread src/mcp/client/session.py Outdated
Comment thread src/mcp/shared/jsonrpc_dispatcher.py Outdated
Comment thread src/mcp/shared/direct_dispatcher.py Outdated
Comment thread docs/tutorial/first-steps.md Outdated
Comment thread docs/advanced/low-level-server.md Outdated
Comment thread src/mcp/shared/jsonrpc_dispatcher.py Outdated
Comment thread src/mcp/server/mcpserver/server.py Outdated
@maxisbey maxisbey force-pushed the subscriptions-listen branch from a77efc4 to 94da89a Compare June 30, 2026 18:25
@maxisbey maxisbey changed the title Support subscriptions/listen (SEP-2575) Serve subscriptions/listen with a pluggable event bus (SEP-2575) Jun 30, 2026
Comment thread src/mcp/server/mcpserver/context.py Outdated
Comment thread src/mcp/server/mcpserver/server.py
Comment thread src/mcp/server/subscriptions.py

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 issues found across 14 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="docs_src/subscriptions/tutorial002.py">

<violation number="1" location="docs_src/subscriptions/tutorial002.py:31">
P2: This example sends `notifications/resources/updated` for `note://...` without registering any resource list/read handlers. Add `on_list_resources`/`on_read_resource` for `NOTES` (or publish a tool-list change instead) so the subscription event points at an actual server resource.</violation>
</file>

<file name="examples/stories/subscriptions/server_lowlevel.py">

<violation number="1" location="examples/stories/subscriptions/server_lowlevel.py:53">
P2: This emits resource-updated events for URIs the low-level server cannot read. Add matching low-level resource handlers/capability or avoid publishing `ResourceUpdated`.</violation>
</file>

<file name="examples/stories/manifest.toml">

<violation number="1" location="examples/stories/manifest.toml:75">
P3: Promoting `subscriptions` to a runnable story leaves the story index stale; update `examples/stories/README.md` so users don't see it listed as deferred/not implemented.</violation>
</file>

Tip: instead of fixing issues one by one fix them all with cubic
Tip: Review your code locally with the cubic CLI to iterate faster.

Re-trigger cubic

Comment thread docs_src/subscriptions/tutorial002.py
Comment thread examples/stories/subscriptions/server.py Outdated
Comment thread examples/stories/subscriptions/server_lowlevel.py Outdated
Comment thread examples/stories/subscriptions/server_lowlevel.py
Comment thread examples/stories/subscriptions/client.py Outdated
Comment thread examples/stories/manifest.toml
Comment thread src/mcp/server/subscriptions.py
maxisbey added 5 commits June 30, 2026 20:57
On the 2026-07-28 wire there is no standing GET stream: clients opt in to
server events via a subscriptions/listen request whose response is the
stream. Add the server-side runtime:

- mcp/server/subscriptions.py: an EventBus protocol (publish/subscribe over
  four typed ServerEvent kinds) with an in-process InMemoryEventBus default;
  implement it over an external pub/sub backend (e.g. Redis) to fan events
  out across replicas. ListenHandler serves the method: ack-first, per-stream
  filter honoring, subscription-id tagging on every frame, and close() ends
  all streams gracefully with the stamped SubscriptionsListenResult.
- MCPServer takes subscriptions= (defaults to the in-memory bus), registers
  the handler automatically, and exposes the bus as a property. Context gains
  notify_tools_changed / notify_prompts_changed / notify_resources_changed /
  notify_resource_updated to publish from inside handlers.
- Lowlevel Server users compose the same parts themselves via the existing
  on_subscriptions_listen slot; no lowlevel, session, or transport changes.
- Remove the server-stateless conformance baselines: the scenario's listen
  checks (ack-first, subscription-id tagging, filter honoring) now pass.
Review follow-ups to the subscriptions/listen runtime:

- Rename EventBus to SubscriptionBus: it carries exactly the four
  subscription event kinds, and the generic name collided conceptually
  with the unrelated EventStore in the same package.
- Make SubscriptionBus.publish async. Backend implementations (Redis,
  NATS) do network I/O on publish; a sync protocol would force them to
  block the loop or spawn their own tasks. This also makes the Context
  notify_* methods genuinely async, so they cannot be called from sync
  handlers (which run on worker threads where waking the listen streams
  is unsafe) - the illegal context is now unrepresentable instead of
  guarded.
- Subscribe each listen stream to the bus before sending the ack: an
  event published while the ack write was suspended used to be lost
  after the client had been told the subscription was live. The ack is
  still the first frame - the handler task alone writes the stream and
  only drains the buffer after the ack send returns.
- Register bus listeners under per-subscription tokens: equal callables
  (e.g. the same bound method subscribed twice) used to collapse in the
  set, so one unsubscribe silently detached both registrations.
- Drop the stdio claim from ListenHandler's docstring (no 2026-era
  stdio serving exists yet) and document that resource-updated URIs are
  matched as exact strings and do not reach legacy resources/subscribe
  subscribers.
- docs/advanced/subscriptions.md: publish-first narrative for MCPServer
  (ctx.notify_*), the filter/no-replay contract, the SubscriptionBus seam
  for multi-replica deployments, and the low-level composition. Snippets
  under docs_src/subscriptions/ with their claims proved by
  tests/docs_src/test_subscriptions.py; cross-linked from the context
  tutorial and the low-level server page.
- examples/stories/subscriptions: promote the deferred stub to a runnable
  story (modern era, both transports, MCPServer and lowlevel variants).
  The client opens a listen stream through the session escape hatch,
  watches one URI and the tool list, observes exact-URI filtering and a
  runtime tool registration announcing itself, and closes the stream by
  cancelling the parked request.
The big one: MCPServer served subscriptions/listen but still advertised
listChanged: false / subscribe: false everywhere, so a spec-following
client - which capability-gates its listen filter - would never
subscribe, making the publish surface unreachable. get_capabilities now
takes an optional protocol_version: at 2026-07-28+ the listChanged and
resources.subscribe bits derive from whether subscriptions/listen is
served (that is what those bits mean on a wire whose only delivery
channel is the listen stream); the handshake-era derivation is unchanged
and remains the default. server/discover passes the request's version.
The everything-server gains test_trigger_tool_change /
test_trigger_prompt_change diagnostic tools (backed by a new
MCPServer.remove_prompt mirroring remove_tool), so the conformance
list-changed SHOULD checks now run and pass: server-stateless is 30/30.

Review follow-ups:
- The bus is no longer exposed as a public MCPServer property; Context
  receives it at construction and the notify_* methods publish through
  it directly. Publishing from outside a request means keeping a
  reference to the bus you constructed.
- json_response=True no longer hangs subscriptions/listen: a listen
  response is a notification stream, so it takes the SSE path regardless
  of the JSON-response preference (TypeScript/Go parity).
- ListenHandler gains max_subscriptions (rejected pre-ack past the cap)
  and max_buffered_events (a stream whose client stopped reading is
  ended at the cap; the client re-listens - no replay means the backlog
  was already lossy).
- Honored resource URIs are matched via a frozenset instead of a list
  scan on every publish.
- InMemorySubscriptionBus isolates raising listeners (logged + skipped)
  so one bad listener cannot starve the others or fail the publisher.
- close() docs now say it initiates closure; each stream flushes from
  its own handler task.
- Story and docs corrections: cancelling the parked listen request only
  releases the local task over HTTP today (the stream ends with the
  connection); capability docs/tests updated for the era-aware bits;
  stories index row promoted; small example hardening.
Any client may subscribe to any URI, including one it cannot read, and
will receive update notifications for it (resource existence and change
timing - never content). Multi-tenant servers should not publish
sensitive per-user URIs, or should serve the method with their own
handler and narrow the filter before acking; a narrowing hook on the
built-in handler is a candidate follow-up.
Comment thread src/mcp/server/mcpserver/server.py
Comment thread tests/server/lowlevel/test_server_discover.py
@maxisbey maxisbey force-pushed the subscriptions-listen branch from 39884fd to a6b9f03 Compare June 30, 2026 21:01
Comment thread src/mcp/server/mcpserver/server.py Outdated
Comment thread src/mcp/server/mcpserver/server.py
Comment thread src/mcp/server/subscriptions.py
Comment thread src/mcp/server/mcpserver/server.py
- Pass the server-scoped subscription bus into the fallback Contexts built
  by programmatic call_tool/read_resource/get_prompt, so ctx.notify_* works
  there instead of raising.
- Reject subscriptions/listen with 406 when the Accept header lacks
  text/event-stream: the response is always an SSE stream, so JSON-response
  mode must not serve a content type the client never accepted.
- Release a stream's subscription slot at backlog overflow time; the
  stream's own cleanup can be wedged in a transport write that closing the
  buffer cannot wake.
- End InMemorySubscriptionBus.publish with a checkpoint so a same-task
  publish burst lets listen streams drain between events instead of
  overflowing a healthy stream's buffer.
- Describe the graceful-close result as a deliberate end rather than a
  "don't re-listen" signal: a stream ended at the overflow cap sends the
  same result and the client should re-listen.
- Move a function-body import to the top of the file and fix a stale
  comment referencing a removed property.
Comment thread src/mcp/server/subscriptions.py

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 9 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/mcp/server/subscriptions.py">

<violation number="1" location="src/mcp/server/subscriptions.py:245">
P2: Overflow frees the subscription slot but leaves the wedged stream subscribed to the bus. If the transport write never resumes, each overflowed stream leaks its listener and captured request state and is still called on every future publish; unsubscribe the listener at overflow time as well as closing the buffer.</violation>
</file>

Tip: Review your code locally with the cubic CLI to iterate faster.

Fix all with cubic | Re-trigger cubic

# Release the subscription slot now: the handler's own
# cleanup can be wedged in a transport write that closing
# this buffer cannot wake (a client that stopped reading).
self._streams.discard(send)

@cubic-dev-ai cubic-dev-ai Bot Jun 30, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Overflow frees the subscription slot but leaves the wedged stream subscribed to the bus. If the transport write never resumes, each overflowed stream leaks its listener and captured request state and is still called on every future publish; unsubscribe the listener at overflow time as well as closing the buffer.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/mcp/server/subscriptions.py, line 245:

<comment>Overflow frees the subscription slot but leaves the wedged stream subscribed to the bus. If the transport write never resumes, each overflowed stream leaks its listener and captured request state and is still called on every future publish; unsubscribe the listener at overflow time as well as closing the buffer.</comment>

<file context>
@@ -235,6 +239,10 @@ def deliver(event: ServerEvent) -> None:
+                    # Release the subscription slot now: the handler's own
+                    # cleanup can be wedged in a transport write that closing
+                    # this buffer cannot wake (a client that stopped reading).
+                    self._streams.discard(send)
                     send.close()
 
</file context>
Fix with cubic

The unsubscribe callable returned by a custom SubscriptionBus runs first in
the stream's finally block; if it raised, the stream's slot release and
buffer closes were skipped, permanently consuming a max_subscriptions slot.
Run it through a logged isolation boundary, mirroring the listener isolation
publish already does.
@maxisbey maxisbey enabled auto-merge (squash) June 30, 2026 22:00
@maxisbey maxisbey merged commit ca10dad into main Jun 30, 2026
37 checks passed
@maxisbey maxisbey deleted the subscriptions-listen branch June 30, 2026 22:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants