Skip to content

dispatcher

Dispatcher Protocol - the call/return boundary between transports and handlers.

A Dispatcher turns a duplex message channel into two things:

  • an outbound API: send_raw_request(method, params) and notify(method, params)
  • an inbound pump: run(on_request, on_notify) that drives the receive loop and invokes the supplied handlers for each incoming request/notification

It is deliberately not MCP-aware. Method names are strings, params and results are dict[str, Any]. The MCP type layer (request/result models, capability negotiation, Context) sits above this; the wire encoding (JSON-RPC, gRPC, in-process direct calls) sits below it.

See JSONRPCDispatcher for the production implementation and DirectDispatcher for an in-memory implementation used in tests and for embedding a server in-process.

as_request_id

as_request_id(value: object) -> RequestId | None

Narrow an untyped wire value to a RequestId, or None; rejects bool (True would alias request id 1).

Source code in src/mcp/shared/dispatcher.py
49
50
51
52
53
def as_request_id(value: object) -> RequestId | None:
    """Narrow an untyped wire value to a `RequestId`, or None; rejects bool (True would alias request id 1)."""
    if isinstance(value, str | int) and not isinstance(value, bool):
        return value
    return None

coerce_request_id

coerce_request_id(request_id: RequestId) -> RequestId

Coerce a stringified int request id back to int so a peer-echoed id still correlates (matches the TS SDK).

This is the collision/correlation domain dispatchers share: "7" and 7 are one id for correlation purposes, even where the wire carries the verbatim value.

Source code in src/mcp/shared/dispatcher.py
56
57
58
59
60
61
62
63
64
65
66
67
def coerce_request_id(request_id: RequestId) -> RequestId:
    """Coerce a stringified int request id back to int so a peer-echoed id still correlates (matches the TS SDK).

    This is the collision/correlation domain dispatchers share: "7" and 7 are one
    id for correlation purposes, even where the wire carries the verbatim value.
    """
    if isinstance(request_id, str):
        try:
            return int(request_id)
        except ValueError:
            pass
    return request_id

ProgressFnT

Bases: Protocol

Callback invoked when a progress notification arrives for a pending request.

Source code in src/mcp/shared/dispatcher.py
70
71
72
73
class ProgressFnT(Protocol):
    """Callback invoked when a progress notification arrives for a pending request."""

    async def __call__(self, progress: float, total: float | None, message: str | None) -> None: ...

CallOptions

Bases: TypedDict

Per-call options for Outbound.send_raw_request.

All keys are optional. Dispatchers ignore keys they do not understand.

Source code in src/mcp/shared/dispatcher.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class CallOptions(TypedDict, total=False):
    """Per-call options for `Outbound.send_raw_request`.

    All keys are optional. Dispatchers ignore keys they do not understand.
    """

    request_id: RequestId
    """Send the request under this caller-supplied id instead of a dispatcher-minted one.

    The peer sees the value verbatim ("7" stays a string). A value that collides
    with one of the sender's own in-flight request ids raises `ValueError`.
    Callers that need to know a request's id before its result arrives (a
    `subscriptions/listen` stream is demultiplexed by it) mint their own ids
    here; string ids that don't parse as integers can never collide with the
    dispatcher's minted sequence. Per the class contract, dispatchers that
    predate this key ignore it and mint as usual.
    """

    timeout: float
    """Seconds to wait for a result before raising and sending `notifications/cancelled`."""

    cancel_on_abandon: bool
    """Whether abandoning this request (timeout or caller cancellation) sends `notifications/cancelled`.

    Defaults to `True`. Set `False` for requests the protocol forbids cancelling, such as `initialize`.
    Also suppressed when resumption hints reach the transport, or when the request was never written.
    """

    on_progress: ProgressFnT
    """Receive `notifications/progress` updates for this request."""

    resumption_token: str
    """Opaque token to resume a previously interrupted request.

    Client-side, streamable-HTTP only. Ignored by server dispatchers and other
    transports, and also ignored (with a debug log) for requests sent from a
    `DispatchContext`, where routing onto the inbound request's stream takes
    precedence. Supports protocol version 2025-11-25 and earlier; SSE-stream
    resumption is removed in the next protocol revision.
    """

    on_resumption_token: Callable[[str], Awaitable[None]]
    """Receive a resumption token when the transport issues one for this request.

    Client-side, streamable-HTTP only. Ignored by server dispatchers and other
    transports, and also ignored (with a debug log) for requests sent from a
    `DispatchContext`, where routing onto the inbound request's stream takes
    precedence. Supports protocol version 2025-11-25 and earlier; SSE-stream
    resumption is removed in the next protocol revision.
    """

    headers: dict[str, str]
    """Transport-layer hint: HTTP transports merge these onto the outgoing request; non-HTTP transports ignore."""

request_id instance-attribute

request_id: RequestId

Send the request under this caller-supplied id instead of a dispatcher-minted one.

The peer sees the value verbatim ("7" stays a string). A value that collides with one of the sender's own in-flight request ids raises ValueError. Callers that need to know a request's id before its result arrives (a subscriptions/listen stream is demultiplexed by it) mint their own ids here; string ids that don't parse as integers can never collide with the dispatcher's minted sequence. Per the class contract, dispatchers that predate this key ignore it and mint as usual.

timeout instance-attribute

timeout: float

Seconds to wait for a result before raising and sending notifications/cancelled.

cancel_on_abandon instance-attribute

cancel_on_abandon: bool

Whether abandoning this request (timeout or caller cancellation) sends notifications/cancelled.

Defaults to True. Set False for requests the protocol forbids cancelling, such as initialize. Also suppressed when resumption hints reach the transport, or when the request was never written.

on_progress instance-attribute

on_progress: ProgressFnT

Receive notifications/progress updates for this request.

resumption_token instance-attribute

resumption_token: str

Opaque token to resume a previously interrupted request.

Client-side, streamable-HTTP only. Ignored by server dispatchers and other transports, and also ignored (with a debug log) for requests sent from a DispatchContext, where routing onto the inbound request's stream takes precedence. Supports protocol version 2025-11-25 and earlier; SSE-stream resumption is removed in the next protocol revision.

on_resumption_token instance-attribute

on_resumption_token: Callable[[str], Awaitable[None]]

Receive a resumption token when the transport issues one for this request.

Client-side, streamable-HTTP only. Ignored by server dispatchers and other transports, and also ignored (with a debug log) for requests sent from a DispatchContext, where routing onto the inbound request's stream takes precedence. Supports protocol version 2025-11-25 and earlier; SSE-stream resumption is removed in the next protocol revision.

headers instance-attribute

headers: dict[str, str]

Transport-layer hint: HTTP transports merge these onto the outgoing request; non-HTTP transports ignore.

Outbound

Bases: Protocol

Anything that can send requests and notifications to the peer.

Both Dispatcher (top-level outbound) and DispatchContext (back-channel during an inbound request) extend this. The MCP type layer (ClientPeer, Connection) builds typed send_request / convenience methods on top of this raw channel.

Source code in src/mcp/shared/dispatcher.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@runtime_checkable
class Outbound(Protocol):
    """Anything that can send requests and notifications to the peer.

    Both `Dispatcher` (top-level outbound) and `DispatchContext` (back-channel
    during an inbound request) extend this. The MCP type layer (`ClientPeer`,
    `Connection`) builds typed `send_request` / convenience methods on top of
    this raw channel.
    """

    async def send_raw_request(
        self,
        method: str,
        params: Mapping[str, Any] | None,
        opts: CallOptions | None = None,
    ) -> dict[str, Any]:
        """Send a request and await its raw result dict.

        Raises:
            MCPError: If the peer responded with an error, or the handler
                raised. Implementations normalize all handler exceptions to
                `MCPError` so callers see a single exception type.
        """
        ...

    async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
        """Send a fire-and-forget notification."""
        ...

send_raw_request async

send_raw_request(
    method: str,
    params: Mapping[str, Any] | None,
    opts: CallOptions | None = None,
) -> dict[str, Any]

Send a request and await its raw result dict.

Raises:

Type Description
MCPError

If the peer responded with an error, or the handler raised. Implementations normalize all handler exceptions to MCPError so callers see a single exception type.

Source code in src/mcp/shared/dispatcher.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
async def send_raw_request(
    self,
    method: str,
    params: Mapping[str, Any] | None,
    opts: CallOptions | None = None,
) -> dict[str, Any]:
    """Send a request and await its raw result dict.

    Raises:
        MCPError: If the peer responded with an error, or the handler
            raised. Implementations normalize all handler exceptions to
            `MCPError` so callers see a single exception type.
    """
    ...

notify async

notify(
    method: str,
    params: Mapping[str, Any] | None,
    opts: CallOptions | None = None,
) -> None

Send a fire-and-forget notification.

Source code in src/mcp/shared/dispatcher.py
156
157
158
async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
    """Send a fire-and-forget notification."""
    ...

DispatchContext

Bases: Outbound, Protocol[TransportT_co]

Per-request context handed to on_request / on_notify.

Carries the transport metadata for the inbound message and provides the back-channel for sending requests/notifications to the peer while handling it. send_raw_request raises NoBackChannelError if can_send_request is False.

Source code in src/mcp/shared/dispatcher.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class DispatchContext(Outbound, Protocol[TransportT_co]):
    """Per-request context handed to `on_request` / `on_notify`.

    Carries the transport metadata for the inbound message and provides the
    back-channel for sending requests/notifications to the peer while handling
    it. `send_raw_request` raises `NoBackChannelError` if `can_send_request`
    is `False`.
    """

    @property
    def transport(self) -> TransportT_co:
        """Transport-specific metadata for this inbound message."""
        ...

    @property
    def can_send_request(self) -> bool:
        """Whether the back-channel can currently deliver server-initiated requests.

        `False` when the transport has no back-channel, or when this context has
        been closed (the inbound request finished). `send_raw_request` raises
        `NoBackChannelError` exactly when this is `False`.
        """
        ...

    @property
    def request_id(self) -> RequestId | None:
        """The id of the inbound request, or `None` for a notification.

        For JSON-RPC this is the wire `id` field. Handlers thread it through
        as `related_request_id` on outbound notifications so HTTP transports
        can route them onto the originating request's response stream.
        """
        ...

    @property
    def message_metadata(self) -> MessageMetadata:
        """The metadata the transport attached to this inbound message, if any.

        This is `SessionMessage.metadata` passed through verbatim: HTTP
        transports attach `ServerMessageMetadata` (the HTTP request, SSE
        stream-close callbacks); stdio and in-memory dispatch attach nothing.
        Tied to the `SessionMessage` wire format - goes away when transports
        stop delivering messages that way.
        """
        # TODO(maxisbey): remove for context rework
        ...

    @property
    def cancel_requested(self) -> anyio.Event:
        """Set when the peer sends `notifications/cancelled` for this request."""
        ...

    async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
        """Report progress for the inbound request, if the peer supplied a progress token.

        A no-op when no token was supplied.
        """
        ...

transport property

transport: TransportT_co

Transport-specific metadata for this inbound message.

can_send_request property

can_send_request: bool

Whether the back-channel can currently deliver server-initiated requests.

False when the transport has no back-channel, or when this context has been closed (the inbound request finished). send_raw_request raises NoBackChannelError exactly when this is False.

request_id property

request_id: RequestId | None

The id of the inbound request, or None for a notification.

For JSON-RPC this is the wire id field. Handlers thread it through as related_request_id on outbound notifications so HTTP transports can route them onto the originating request's response stream.

message_metadata property

message_metadata: MessageMetadata

The metadata the transport attached to this inbound message, if any.

This is SessionMessage.metadata passed through verbatim: HTTP transports attach ServerMessageMetadata (the HTTP request, SSE stream-close callbacks); stdio and in-memory dispatch attach nothing. Tied to the SessionMessage wire format - goes away when transports stop delivering messages that way.

cancel_requested property

cancel_requested: Event

Set when the peer sends notifications/cancelled for this request.

progress async

progress(
    progress: float,
    total: float | None = None,
    message: str | None = None,
) -> None

Report progress for the inbound request, if the peer supplied a progress token.

A no-op when no token was supplied.

Source code in src/mcp/shared/dispatcher.py
213
214
215
216
217
218
async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
    """Report progress for the inbound request, if the peer supplied a progress token.

    A no-op when no token was supplied.
    """
    ...

OnRequest module-attribute

Handler for inbound requests: (ctx, method, params) -> result. Raise MCPError to send an error response.

OnNotify module-attribute

OnNotify = Callable[
    [
        DispatchContext[TransportContext],
        str,
        Mapping[str, Any] | None,
    ],
    Awaitable[None],
]

Handler for inbound notifications: (ctx, method, params).

OnNotifyIntercept module-attribute

OnNotifyIntercept = Callable[
    [str, Mapping[str, Any] | None], bool
]

Synchronous receive-order intercept for inbound notifications: (method, params) -> consumed.

Runs before on_notify is scheduled so correlation state advances in wire order relative to response resolution (the client's listen demux depends on this). Returning True consumes the notification. Must not block the receive path.

run_notify_intercept

run_notify_intercept(
    intercept: OnNotifyIntercept | None,
    method: str,
    params: Mapping[str, Any] | None,
) -> bool

Invoke intercept, containing a raise to that one notification (never the receive loop).

Source code in src/mcp/shared/dispatcher.py
236
237
238
239
240
241
242
243
244
def run_notify_intercept(intercept: OnNotifyIntercept | None, method: str, params: Mapping[str, Any] | None) -> bool:
    """Invoke `intercept`, containing a raise to that one notification (never the receive loop)."""
    if intercept is None:
        return False
    try:
        return intercept(method, params)
    except Exception:
        logger.exception("notification intercept raised; passing %r through", method)
        return False

Dispatcher

Bases: Outbound, Protocol[TransportT_co]

A duplex request/notification channel with call-return semantics.

Implementations own correlation of outbound requests to inbound results, the receive loop, per-request concurrency, and cancellation/progress wiring.

The lifecycle surface is provisional; run() may change before v2 stable.

Source code in src/mcp/shared/dispatcher.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class Dispatcher(Outbound, Protocol[TransportT_co]):
    """A duplex request/notification channel with call-return semantics.

    Implementations own correlation of outbound requests to inbound results, the
    receive loop, per-request concurrency, and cancellation/progress wiring.

    The lifecycle surface is provisional; `run()` may change before v2 stable.
    """

    async def run(
        self,
        on_request: OnRequest,
        on_notify: OnNotify,
        on_notify_intercept: OnNotifyIntercept | None = None,
        *,
        task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
    ) -> None:
        """Drive the receive loop until the underlying channel closes.

        Each inbound request is dispatched to `on_request` in its own task;
        the returned dict (or raised `MCPError`) is sent back as the response.
        Implementations MUST offer every inbound notification to
        `on_notify_intercept` synchronously in receive order (via
        `run_notify_intercept`), handing only unconsumed ones to `on_notify`.

        `task_status.started()` is called once the dispatcher is ready to
        accept `send_request`/`notify` calls, so callers can use
        `await tg.start(dispatcher.run, on_request, on_notify)`.
        """
        ...

run async

run(
    on_request: OnRequest,
    on_notify: OnNotify,
    on_notify_intercept: OnNotifyIntercept | None = None,
    *,
    task_status: TaskStatus[None] = TASK_STATUS_IGNORED
) -> None

Drive the receive loop until the underlying channel closes.

Each inbound request is dispatched to on_request in its own task; the returned dict (or raised MCPError) is sent back as the response. Implementations MUST offer every inbound notification to on_notify_intercept synchronously in receive order (via run_notify_intercept), handing only unconsumed ones to on_notify.

task_status.started() is called once the dispatcher is ready to accept send_request/notify calls, so callers can use await tg.start(dispatcher.run, on_request, on_notify).

Source code in src/mcp/shared/dispatcher.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
async def run(
    self,
    on_request: OnRequest,
    on_notify: OnNotify,
    on_notify_intercept: OnNotifyIntercept | None = None,
    *,
    task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
) -> None:
    """Drive the receive loop until the underlying channel closes.

    Each inbound request is dispatched to `on_request` in its own task;
    the returned dict (or raised `MCPError`) is sent back as the response.
    Implementations MUST offer every inbound notification to
    `on_notify_intercept` synchronously in receive order (via
    `run_notify_intercept`), handing only unconsumed ones to `on_notify`.

    `task_status.started()` is called once the dispatcher is ready to
    accept `send_request`/`notify` calls, so callers can use
    `await tg.start(dispatcher.run, on_request, on_notify)`.
    """
    ...