Server-side subscriptions/listen support (2026-07-28, SEP-2575).
On the 2026-07-28 wire there is no standing GET stream: a client opts in to
server events by sending a subscriptions/listen request whose response IS
the stream. This module provides the two pieces a server needs:
SubscriptionBus: the pluggable fan-out seam. The bus carries typed ServerEvent
values, not wire notifications - the listen handler owns subscription-id
stamping and per-stream filtering, so a custom bus (e.g. backed by Redis
pub/sub for multi-replica deployments) never sees JSON-RPC. The in-process
default is InMemorySubscriptionBus.
ListenHandler: the request handler that serves subscriptions/listen.
MCPServer registers one automatically; lowlevel Server users pass an
instance as on_subscriptions_listen=.
The event vocabulary lives in mcp.shared.subscriptions, shared with the client driver, and is re-exported here.
Per the spec, the handler acknowledges first (the ack is the first frame on
the stream), tags every frame with the listen request's JSON-RPC id under
_meta["io.modelcontextprotocol/subscriptionId"], and never delivers an
event kind the client did not request. Delivery is fire-and-forget with no
replay: a dropped stream is not resumable - clients re-listen and refetch.
SUBSCRIPTION_ID_META_KEY = (
"io.modelcontextprotocol/subscriptionId"
)
The _meta key on every listen-stream frame; the value is the subscriptions/listen request's JSON-RPC id.
PromptsListChanged
dataclass
The server's prompt list changed.
Source code in src/mcp/shared/subscriptions.py
| @dataclass(frozen=True)
class PromptsListChanged:
"""The server's prompt list changed."""
|
ResourcesListChanged
dataclass
The server's resource list changed.
Source code in src/mcp/shared/subscriptions.py
| @dataclass(frozen=True)
class ResourcesListChanged:
"""The server's resource list changed."""
|
ResourceUpdated
dataclass
The resource at uri changed and may need to be read again.
Source code in src/mcp/shared/subscriptions.py
| @dataclass(frozen=True)
class ResourceUpdated:
"""The resource at `uri` changed and may need to be read again."""
uri: str
|
ServerEvent
module-attribute
An event a server publishes for delivery to listen subscribers.
The server's tool list changed.
Source code in src/mcp/shared/subscriptions.py
| @dataclass(frozen=True)
class ToolsListChanged:
"""The server's tool list changed."""
|
SubscriptionBus
Bases: Protocol
Fan-out seam between event publishers and open listen streams.
Implement this over an external pub/sub backend (Redis, NATS, ...) to fan
events out across replicas: publish forwards the event to the backend,
and each replica's bus invokes its local listeners for events arriving
from the backend. The same instance can be shared across servers.
publish is async so backend implementations can do network I/O.
subscribe is synchronous local registration. Listeners are synchronous,
must not raise, and are invoked on the server's event loop.
Source code in src/mcp/server/subscriptions.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 | class SubscriptionBus(Protocol):
"""Fan-out seam between event publishers and open listen streams.
Implement this over an external pub/sub backend (Redis, NATS, ...) to fan
events out across replicas: `publish` forwards the event to the backend,
and each replica's bus invokes its local listeners for events arriving
from the backend. The same instance can be shared across servers.
`publish` is async so backend implementations can do network I/O.
`subscribe` is synchronous local registration. Listeners are synchronous,
must not raise, and are invoked on the server's event loop.
"""
async def publish(self, event: ServerEvent) -> None:
"""Deliver `event` to every subscribed listener."""
...
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
"""Register `listener` and return an idempotent unsubscribe callable."""
...
|
publish
async
Deliver event to every subscribed listener.
Source code in src/mcp/server/subscriptions.py
| async def publish(self, event: ServerEvent) -> None:
"""Deliver `event` to every subscribed listener."""
...
|
subscribe
Register listener and return an idempotent unsubscribe callable.
Source code in src/mcp/server/subscriptions.py
| def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
"""Register `listener` and return an idempotent unsubscribe callable."""
...
|
InMemorySubscriptionBus
In-process SubscriptionBus: synchronous fan-out to listeners in subscription order.
Source code in src/mcp/server/subscriptions.py
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 | class InMemorySubscriptionBus:
"""In-process `SubscriptionBus`: synchronous fan-out to listeners in subscription order."""
def __init__(self) -> None:
# Keyed by a per-subscription token so the same callable can be
# registered more than once (bound methods compare equal).
self._listeners: dict[object, Callable[[ServerEvent], None]] = {}
async def publish(self, event: ServerEvent) -> None:
"""Deliver `event` to every subscribed listener.
A raising listener is logged and skipped: one bad listener must not
starve the others or fail the publishing handler. Ends with a
checkpoint so a burst of publishes from one task lets listen streams
drain between events instead of overflowing their buffers unread.
"""
for listener in list(self._listeners.values()):
try:
listener(event)
except Exception: # fan-out boundary: isolate listeners from each other
logger.exception("subscription listener raised; continuing")
await anyio.lowlevel.checkpoint()
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
"""Register `listener` and return an idempotent unsubscribe callable."""
token = object()
self._listeners[token] = listener
def unsubscribe() -> None:
self._listeners.pop(token, None)
return unsubscribe
|
publish
async
Deliver event to every subscribed listener.
A raising listener is logged and skipped: one bad listener must not
starve the others or fail the publishing handler. Ends with a
checkpoint so a burst of publishes from one task lets listen streams
drain between events instead of overflowing their buffers unread.
Source code in src/mcp/server/subscriptions.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115 | async def publish(self, event: ServerEvent) -> None:
"""Deliver `event` to every subscribed listener.
A raising listener is logged and skipped: one bad listener must not
starve the others or fail the publishing handler. Ends with a
checkpoint so a burst of publishes from one task lets listen streams
drain between events instead of overflowing their buffers unread.
"""
for listener in list(self._listeners.values()):
try:
listener(event)
except Exception: # fan-out boundary: isolate listeners from each other
logger.exception("subscription listener raised; continuing")
await anyio.lowlevel.checkpoint()
|
subscribe
Register listener and return an idempotent unsubscribe callable.
Source code in src/mcp/server/subscriptions.py
117
118
119
120
121
122
123
124
125 | def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
"""Register `listener` and return an idempotent unsubscribe callable."""
token = object()
self._listeners[token] = listener
def unsubscribe() -> None:
self._listeners.pop(token, None)
return unsubscribe
|
ListenHandler
Serves subscriptions/listen: one call is one subscription stream.
Register on a lowlevel Server via on_subscriptions_listen= (or
add_request_handler); MCPServer does so automatically. Each call
acknowledges the honored filter first, then forwards matching bus events
onto the request's response stream until the client disconnects (which
cancels the handler; the stream just ends, per the spec's abrupt-close
contract) or close ends all streams gracefully.
Requires a transport that can stream a request's response (streamable
HTTP's SSE mode).
max_subscriptions bounds concurrent streams (further listen requests are
rejected with INTERNAL_ERROR, before the ack). max_buffered_events
bounds each stream's event backlog: a stream whose client has stopped
reading is ended at the cap (the client re-listens and refetches - there
is no replay, so ending the stream loses nothing the backlog wasn't
already losing).
Source code in src/mcp/server/subscriptions.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 | class ListenHandler:
"""Serves `subscriptions/listen`: one call is one subscription stream.
Register on a lowlevel `Server` via `on_subscriptions_listen=` (or
`add_request_handler`); `MCPServer` does so automatically. Each call
acknowledges the honored filter first, then forwards matching bus events
onto the request's response stream until the client disconnects (which
cancels the handler; the stream just ends, per the spec's abrupt-close
contract) or `close` ends all streams gracefully.
Requires a transport that can stream a request's response (streamable
HTTP's SSE mode).
`max_subscriptions` bounds concurrent streams (further listen requests are
rejected with `INTERNAL_ERROR`, before the ack). `max_buffered_events`
bounds each stream's event backlog: a stream whose client has stopped
reading is ended at the cap (the client re-listens and refetches - there
is no replay, so ending the stream loses nothing the backlog wasn't
already losing).
"""
def __init__(self, bus: SubscriptionBus, *, max_subscriptions: int = 1024, max_buffered_events: int = 1024) -> None:
self._bus = bus
self._max_subscriptions = max_subscriptions
self._max_buffered_events = max_buffered_events
self._streams: set[anyio.streams.memory.MemoryObjectSendStream[ServerEvent]] = set()
async def __call__(
self,
ctx: ServerRequestContext[Any, Any],
params: SubscriptionsListenRequestParams,
) -> SubscriptionsListenResult:
"""Serve one listen stream."""
subscription_id = ctx.request_id
if subscription_id is None:
raise MCPError(INVALID_REQUEST, "subscriptions/listen requires a request id")
if len(self._streams) >= self._max_subscriptions:
raise MCPError(INTERNAL_ERROR, "Subscription limit reached")
honored = _honored_subset(params.notifications)
honored_uris = frozenset(honored.resource_subscriptions or ())
meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id}
# Buffered so publishers don't block on a slow consumer (the transport
# write happens in this handler task, not the publisher's). A stream
# whose backlog hits the cap is ended - see the class docstring.
send, recv = anyio.create_memory_object_stream[ServerEvent](self._max_buffered_events)
def deliver(event: ServerEvent) -> None:
if event_matches(honored, honored_uris, event):
try:
send.send_nowait(event)
except anyio.ClosedResourceError:
# `close` closed this stream; the loop below is unwinding.
pass
except anyio.WouldBlock:
logger.warning("listen stream %r backlog full; ending the stream", subscription_id)
# Release the subscription slot now: the handler's own
# cleanup can be wedged in a transport write that closing
# this buffer cannot wake (a client that stopped reading).
self._streams.discard(send)
send.close()
# Subscribe before sending the ack so an event published while the
# ack write is suspended is buffered rather than lost. The ack is
# still the first frame: this task alone writes the stream, and it
# only starts draining the buffer after the ack send returns.
unsubscribe = self._bus.subscribe(deliver)
self._streams.add(send)
try:
await ctx.session.send_notification(
SubscriptionsAcknowledgedNotification(
params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta)
),
related_request_id=subscription_id,
)
async for event in recv:
await ctx.session.send_notification(
event_to_notification(event, meta), related_request_id=subscription_id
)
finally:
_safe_unsubscribe(unsubscribe)
self._streams.discard(send)
send.close()
recv.close()
return SubscriptionsListenResult(_meta=meta)
def close(self) -> None:
"""Initiate graceful closure of every open listen stream.
Each stream then drains its buffered events and sends its
`SubscriptionsListenResult` (stamped with the subscription id) as the
final frame from its own handler task - the spec's graceful closure
flow, telling clients the stream ended deliberately rather than
dropping. This method only initiates that; it does not wait for the
streams to finish flushing.
"""
for stream in list(self._streams):
stream.close()
|
__call__
async
Serve one listen stream.
Source code in src/mcp/server/subscriptions.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241 | async def __call__(
self,
ctx: ServerRequestContext[Any, Any],
params: SubscriptionsListenRequestParams,
) -> SubscriptionsListenResult:
"""Serve one listen stream."""
subscription_id = ctx.request_id
if subscription_id is None:
raise MCPError(INVALID_REQUEST, "subscriptions/listen requires a request id")
if len(self._streams) >= self._max_subscriptions:
raise MCPError(INTERNAL_ERROR, "Subscription limit reached")
honored = _honored_subset(params.notifications)
honored_uris = frozenset(honored.resource_subscriptions or ())
meta: dict[str, Any] = {SUBSCRIPTION_ID_META_KEY: subscription_id}
# Buffered so publishers don't block on a slow consumer (the transport
# write happens in this handler task, not the publisher's). A stream
# whose backlog hits the cap is ended - see the class docstring.
send, recv = anyio.create_memory_object_stream[ServerEvent](self._max_buffered_events)
def deliver(event: ServerEvent) -> None:
if event_matches(honored, honored_uris, event):
try:
send.send_nowait(event)
except anyio.ClosedResourceError:
# `close` closed this stream; the loop below is unwinding.
pass
except anyio.WouldBlock:
logger.warning("listen stream %r backlog full; ending the stream", subscription_id)
# Release the subscription slot now: the handler's own
# cleanup can be wedged in a transport write that closing
# this buffer cannot wake (a client that stopped reading).
self._streams.discard(send)
send.close()
# Subscribe before sending the ack so an event published while the
# ack write is suspended is buffered rather than lost. The ack is
# still the first frame: this task alone writes the stream, and it
# only starts draining the buffer after the ack send returns.
unsubscribe = self._bus.subscribe(deliver)
self._streams.add(send)
try:
await ctx.session.send_notification(
SubscriptionsAcknowledgedNotification(
params=SubscriptionsAcknowledgedNotificationParams(notifications=honored, _meta=meta)
),
related_request_id=subscription_id,
)
async for event in recv:
await ctx.session.send_notification(
event_to_notification(event, meta), related_request_id=subscription_id
)
finally:
_safe_unsubscribe(unsubscribe)
self._streams.discard(send)
send.close()
recv.close()
return SubscriptionsListenResult(_meta=meta)
|
close
Initiate graceful closure of every open listen stream.
Each stream then drains its buffered events and sends its
SubscriptionsListenResult (stamped with the subscription id) as the
final frame from its own handler task - the spec's graceful closure
flow, telling clients the stream ended deliberately rather than
dropping. This method only initiates that; it does not wait for the
streams to finish flushing.
Source code in src/mcp/server/subscriptions.py
243
244
245
246
247
248
249
250
251
252
253
254 | def close(self) -> None:
"""Initiate graceful closure of every open listen stream.
Each stream then drains its buffered events and sends its
`SubscriptionsListenResult` (stamped with the subscription id) as the
final frame from its own handler task - the spec's graceful closure
flow, telling clients the stream ended deliberately rather than
dropping. This method only initiates that; it does not wait for the
streams to finish flushing.
"""
for stream in list(self._streams):
stream.close()
|