Serve subscriptions/listen with a pluggable event bus (SEP-2575)#3035
Conversation
📚 Documentation preview
|
There was a problem hiding this comment.
8 issues found and verified against the latest diff
Tip: instead of fixing issues one by one fix them all with cubic
Re-trigger cubic
a77efc4 to
94da89a
Compare
There was a problem hiding this comment.
6 issues found across 14 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="docs_src/subscriptions/tutorial002.py">
<violation number="1" location="docs_src/subscriptions/tutorial002.py:31">
P2: This example sends `notifications/resources/updated` for `note://...` without registering any resource list/read handlers. Add `on_list_resources`/`on_read_resource` for `NOTES` (or publish a tool-list change instead) so the subscription event points at an actual server resource.</violation>
</file>
<file name="examples/stories/subscriptions/server_lowlevel.py">
<violation number="1" location="examples/stories/subscriptions/server_lowlevel.py:53">
P2: This emits resource-updated events for URIs the low-level server cannot read. Add matching low-level resource handlers/capability or avoid publishing `ResourceUpdated`.</violation>
</file>
<file name="examples/stories/manifest.toml">
<violation number="1" location="examples/stories/manifest.toml:75">
P3: Promoting `subscriptions` to a runnable story leaves the story index stale; update `examples/stories/README.md` so users don't see it listed as deferred/not implemented.</violation>
</file>
Tip: instead of fixing issues one by one fix them all with cubic
Tip: Review your code locally with the cubic CLI to iterate faster.
Re-trigger cubic
On the 2026-07-28 wire there is no standing GET stream: clients opt in to server events via a subscriptions/listen request whose response is the stream. Add the server-side runtime: - mcp/server/subscriptions.py: an EventBus protocol (publish/subscribe over four typed ServerEvent kinds) with an in-process InMemoryEventBus default; implement it over an external pub/sub backend (e.g. Redis) to fan events out across replicas. ListenHandler serves the method: ack-first, per-stream filter honoring, subscription-id tagging on every frame, and close() ends all streams gracefully with the stamped SubscriptionsListenResult. - MCPServer takes subscriptions= (defaults to the in-memory bus), registers the handler automatically, and exposes the bus as a property. Context gains notify_tools_changed / notify_prompts_changed / notify_resources_changed / notify_resource_updated to publish from inside handlers. - Lowlevel Server users compose the same parts themselves via the existing on_subscriptions_listen slot; no lowlevel, session, or transport changes. - Remove the server-stateless conformance baselines: the scenario's listen checks (ack-first, subscription-id tagging, filter honoring) now pass.
Review follow-ups to the subscriptions/listen runtime: - Rename EventBus to SubscriptionBus: it carries exactly the four subscription event kinds, and the generic name collided conceptually with the unrelated EventStore in the same package. - Make SubscriptionBus.publish async. Backend implementations (Redis, NATS) do network I/O on publish; a sync protocol would force them to block the loop or spawn their own tasks. This also makes the Context notify_* methods genuinely async, so they cannot be called from sync handlers (which run on worker threads where waking the listen streams is unsafe) - the illegal context is now unrepresentable instead of guarded. - Subscribe each listen stream to the bus before sending the ack: an event published while the ack write was suspended used to be lost after the client had been told the subscription was live. The ack is still the first frame - the handler task alone writes the stream and only drains the buffer after the ack send returns. - Register bus listeners under per-subscription tokens: equal callables (e.g. the same bound method subscribed twice) used to collapse in the set, so one unsubscribe silently detached both registrations. - Drop the stdio claim from ListenHandler's docstring (no 2026-era stdio serving exists yet) and document that resource-updated URIs are matched as exact strings and do not reach legacy resources/subscribe subscribers.
- docs/advanced/subscriptions.md: publish-first narrative for MCPServer (ctx.notify_*), the filter/no-replay contract, the SubscriptionBus seam for multi-replica deployments, and the low-level composition. Snippets under docs_src/subscriptions/ with their claims proved by tests/docs_src/test_subscriptions.py; cross-linked from the context tutorial and the low-level server page. - examples/stories/subscriptions: promote the deferred stub to a runnable story (modern era, both transports, MCPServer and lowlevel variants). The client opens a listen stream through the session escape hatch, watches one URI and the tool list, observes exact-URI filtering and a runtime tool registration announcing itself, and closes the stream by cancelling the parked request.
The big one: MCPServer served subscriptions/listen but still advertised listChanged: false / subscribe: false everywhere, so a spec-following client - which capability-gates its listen filter - would never subscribe, making the publish surface unreachable. get_capabilities now takes an optional protocol_version: at 2026-07-28+ the listChanged and resources.subscribe bits derive from whether subscriptions/listen is served (that is what those bits mean on a wire whose only delivery channel is the listen stream); the handshake-era derivation is unchanged and remains the default. server/discover passes the request's version. The everything-server gains test_trigger_tool_change / test_trigger_prompt_change diagnostic tools (backed by a new MCPServer.remove_prompt mirroring remove_tool), so the conformance list-changed SHOULD checks now run and pass: server-stateless is 30/30. Review follow-ups: - The bus is no longer exposed as a public MCPServer property; Context receives it at construction and the notify_* methods publish through it directly. Publishing from outside a request means keeping a reference to the bus you constructed. - json_response=True no longer hangs subscriptions/listen: a listen response is a notification stream, so it takes the SSE path regardless of the JSON-response preference (TypeScript/Go parity). - ListenHandler gains max_subscriptions (rejected pre-ack past the cap) and max_buffered_events (a stream whose client stopped reading is ended at the cap; the client re-listens - no replay means the backlog was already lossy). - Honored resource URIs are matched via a frozenset instead of a list scan on every publish. - InMemorySubscriptionBus isolates raising listeners (logged + skipped) so one bad listener cannot starve the others or fail the publisher. - close() docs now say it initiates closure; each stream flushes from its own handler task. - Story and docs corrections: cancelling the parked listen request only releases the local task over HTTP today (the stream ends with the connection); capability docs/tests updated for the era-aware bits; stories index row promoted; small example hardening.
Any client may subscribe to any URI, including one it cannot read, and will receive update notifications for it (resource existence and change timing - never content). Multi-tenant servers should not publish sensitive per-user URIs, or should serve the method with their own handler and narrow the filter before acking; a narrowing hook on the built-in handler is a candidate follow-up.
39884fd to
a6b9f03
Compare
- Pass the server-scoped subscription bus into the fallback Contexts built by programmatic call_tool/read_resource/get_prompt, so ctx.notify_* works there instead of raising. - Reject subscriptions/listen with 406 when the Accept header lacks text/event-stream: the response is always an SSE stream, so JSON-response mode must not serve a content type the client never accepted. - Release a stream's subscription slot at backlog overflow time; the stream's own cleanup can be wedged in a transport write that closing the buffer cannot wake. - End InMemorySubscriptionBus.publish with a checkpoint so a same-task publish burst lets listen streams drain between events instead of overflowing a healthy stream's buffer. - Describe the graceful-close result as a deliberate end rather than a "don't re-listen" signal: a stream ended at the overflow cap sends the same result and the client should re-listen. - Move a function-body import to the top of the file and fix a stale comment referencing a removed property.
There was a problem hiding this comment.
1 issue found across 9 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/mcp/server/subscriptions.py">
<violation number="1" location="src/mcp/server/subscriptions.py:245">
P2: Overflow frees the subscription slot but leaves the wedged stream subscribed to the bus. If the transport write never resumes, each overflowed stream leaks its listener and captured request state and is still called on every future publish; unsubscribe the listener at overflow time as well as closing the buffer.</violation>
</file>
Tip: Review your code locally with the cubic CLI to iterate faster.
Fix all with cubic | Re-trigger cubic
| # Release the subscription slot now: the handler's own | ||
| # cleanup can be wedged in a transport write that closing | ||
| # this buffer cannot wake (a client that stopped reading). | ||
| self._streams.discard(send) |
There was a problem hiding this comment.
P2: Overflow frees the subscription slot but leaves the wedged stream subscribed to the bus. If the transport write never resumes, each overflowed stream leaks its listener and captured request state and is still called on every future publish; unsubscribe the listener at overflow time as well as closing the buffer.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/mcp/server/subscriptions.py, line 245:
<comment>Overflow frees the subscription slot but leaves the wedged stream subscribed to the bus. If the transport write never resumes, each overflowed stream leaks its listener and captured request state and is still called on every future publish; unsubscribe the listener at overflow time as well as closing the buffer.</comment>
<file context>
@@ -235,6 +239,10 @@ def deliver(event: ServerEvent) -> None:
+ # Release the subscription slot now: the handler's own
+ # cleanup can be wedged in a transport write that closing
+ # this buffer cannot wake (a client that stopped reading).
+ self._streams.discard(send)
send.close()
</file context>
The unsubscribe callable returned by a custom SubscriptionBus runs first in the stream's finally block; if it raised, the stream's slot release and buffer closes were skipped, permanently consuming a max_subscriptions slot. Run it through a logged isolation boundary, mirroring the listener isolation publish already does.
Implements server-side
subscriptions/listen(SEP-2575) for the 2026-07-28 revision.Note
This branch was previously a much larger change covering both sides of the protocol; it has been reset and rebuilt as a minimal server-side implementation. Client-side support will follow separately.
On the 2026-07-28 wire there is no standing GET stream: a client opts in to server events by sending a
subscriptions/listenrequest whose response is the stream. This PR adds the serving half:What's here
mcp/server/subscriptions.py— the whole runtime:SubscriptionBus: a two-method protocol (asyncpublish/ syncsubscribe) over four typedServerEventkinds. The bus carries events, not wire notifications — a custom implementation (e.g. Redis pub/sub for multi-replica deployments) never sees JSON-RPC.InMemorySubscriptionBusis the in-process default (raising listeners are isolated and logged).ListenHandler: serves the method. Ack-first by construction (the stream subscribes before the ack is sent, so nothing published during the ack write is lost, and the ack is still the first frame), per-stream filter honoring (URIs matched via a set), every frame tagged with the listen request's JSON-RPC id under_meta["io.modelcontextprotocol/subscriptionId"], andclose()initiates graceful closure (each stream flushes and ends with the stampedSubscriptionsListenResult). Bounded:max_subscriptions(rejected pre-ack past the cap) andmax_buffered_events(a stream whose client stopped reading is ended at the cap; the client re-listens).MCPServer: optionalsubscriptions=constructor parameter (defaults to the in-memory bus) and the handler registered automatically.Contextgets the bus at construction and publishes throughawait ctx.notify_tools_changed()/notify_prompts_changed()/notify_resources_changed()/notify_resource_updated(uri).get_capabilities()takes an optionalprotocol_version; at 2026-07-28+ thelistChanged/resources.subscribebits derive from whethersubscriptions/listenis served — that is what those bits mean on a wire whose only delivery channel is the listen stream. The handshake-era derivation is byte-identical and remains the default;server/discoverpasses the request's version.Server: composable, not pre-wired — bus in your lifespan state, handlers publish to it,ListenHandler(or your own handler) passed ason_subscriptions_listen=.docs/advanced/subscriptions.md(claims proved bytests/docs_src/test_subscriptions.py) and a runnableexamples/stories/subscriptionsstory (both transports, both server tiers).ServerSession, dispatchers, client transports, ormcp-types.Design notes
This mirrors the typescript-sdk's
ServerEventBus+ listen-router split: the SDK owns all wire semantics (ack ordering, filtering, id stamping, teardown, limits) so a buggy or external bus cannot violate the spec's MUSTs. Delivery is fire-and-forget with no replay — a dropped stream is not resumable; clients re-listen and refetch.json_response=Truedoes not apply to listen: a listen response is a notification stream, so it always takes the SSE path (TypeScript/Go parity).Conformance
server-statelessis 30/30: the three listen MUST checks (ack-first, subscription-id tagging, filter honoring) pass, and with the capability bits now advertised the two list-changed SHOULD checks run and pass against the everything-server's newtest_trigger_*_changediagnostic tools. Draft and 2026-07-28 legs both pass their baselines; theserver-statelessexpected-failures entries are removed.