From e9aa7e1b9cddb758d7bc64ef7ccf44aee04254ac Mon Sep 17 00:00:00 2001 From: philippe Date: Thu, 2 Jul 2026 09:30:33 -0400 Subject: [PATCH 1/2] fix copy_context in ws threadpool --- CHANGELOG.md | 1 + dash/backends/ws.py | 8 +++- tests/unit/test_websocket_executor.py | 56 ++++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fec877d10..13649c10d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](https://semver.org/). - [#3826](https://github.com/plotly/dash/pull/3826) WebSocket callback dispatch no longer lets long-lived callbacks limit the number of concurrent users. Async callbacks (including session-persistent ones) run directly on the connection event loop instead of occupying a worker thread, and synchronous callbacks run on a shared `ThreadPoolExecutor` whose size is configurable via the new `websocket_max_workers` argument to `Dash` (default `4`). A synchronous persistent (no-output) callback now warns at registration since it would tie up a worker thread. ## Fixed +- [#3861](https://github.com/plotly/dash/issues/3861) Fix synchronous WebSocket callbacks not inheriting `ContextVar` values bound by ASGI middleware. `copy_context()` is now captured on the event-loop thread before the callback is submitted to the thread pool, instead of inside the worker thread where only default values were visible. - [#3822](https://github.com/plotly/dash/pull/3822) Fix `UnboundLocalError` for `user_callback_output` in async background callbacks (Celery and Diskcache managers) when the callback raises `PreventUpdate` or another exception before the variable is assigned. - [#3819](https://github.com/plotly/dash/pull/3819) Fix `RuntimeError: No active request in context` when a non-Dash path falls through to the FastAPI catch-all route. Fixes [#3812](https://github.com/plotly/dash/issues/3812). - [#3838](https://github.com/plotly/dash/pull/3838) Replace `mcp` dependency with inline types. diff --git a/dash/backends/ws.py b/dash/backends/ws.py index 6d0168ecda..6dc2c0e926 100644 --- a/dash/backends/ws.py +++ b/dash/backends/ws.py @@ -550,14 +550,18 @@ def run_callback_in_executor( Future representing the pending callback execution """ + # Snapshot the context on the calling thread (the event-loop thread) *before* + # submitting to the executor. ContextVars bound by ASGI middleware live on this + # thread; capturing inside execute() would run on a fresh worker thread and see + # only default values. This mirrors loop.run_in_executor's context propagation. + ctx = copy_context() + def execute() -> dict: try: partial_func = _prepare_ws_partial( dash_app, payload, ws_callback, response_adapter ) - ctx = copy_context() - # Run in new event loop (handles a callback that still returns a # coroutine, e.g. when reached outside the async dispatch path) def run_callback(): diff --git a/tests/unit/test_websocket_executor.py b/tests/unit/test_websocket_executor.py index 5ab957768c..bdd29e9c23 100644 --- a/tests/unit/test_websocket_executor.py +++ b/tests/unit/test_websocket_executor.py @@ -8,9 +8,16 @@ ``websocket_max_workers`` argument to ``Dash``. """ +import threading from concurrent.futures import ThreadPoolExecutor +from contextvars import ContextVar +from typing import cast -from dash import Dash +import janus + +from dash import Dash, Input, Output +from dash.backends.ws import DashWebsocketCallback, run_callback_in_executor +from dash.types import CallbackExecutionBody def test_websocket_max_workers_default(): @@ -61,3 +68,50 @@ def test_shutdown_executor_allows_recreation(): assert ex1 is not ex2 finally: backend.shutdown_executor(wait=False) + + +def test_run_callback_in_executor_propagates_contextvars(): + """Sync WS callbacks inherit ContextVars bound on the calling thread. + + Regression test for gh-3861: ``copy_context()`` must be captured in + ``run_callback_in_executor`` (on the event-loop thread, where ASGI middleware + binds per-request ContextVars) rather than inside the worker-thread ``execute`` + closure, which would only ever see default values. + """ + myvar: ContextVar = ContextVar("myvar", default="DEFAULT") + + app = Dash(__name__) + + @app.callback(Output("out", "children"), Input("in", "value"), websocket=True) + def cb(value): + return f"{myvar.get()}:{value}" + + payload = cast( + CallbackExecutionBody, + { + "output": "out.children", + "outputs": {"id": "out", "property": "children"}, + "inputs": [{"id": "in", "property": "value", "value": "hi"}], + "state": [], + "changedPropIds": ["in.value"], + }, + ) + + executor = ThreadPoolExecutor(max_workers=2) + outbound_queue: janus.Queue = janus.Queue() + ws_cb = DashWebsocketCallback({}, "rid", outbound_queue, threading.Event(), None) + + try: + # Bind the ContextVar on this (calling) thread, as middleware would. + myvar.set("MIDDLEWARE_VALUE") + future = run_callback_in_executor( + executor, app, payload, ws_cb, app.backend.response_adapter() + ) + result = future.result(timeout=10) + finally: + executor.shutdown(wait=False) + outbound_queue.close() + + assert result["status"] == "ok" + # The worker thread would see the default without the calling-thread snapshot. + assert result["data"]["response"]["out"]["children"] == "MIDDLEWARE_VALUE:hi" From 8f38dda28a21923307a991784b80c7870937028f Mon Sep 17 00:00:00 2001 From: philippe Date: Thu, 2 Jul 2026 10:33:47 -0400 Subject: [PATCH 2/2] test: fix contextvars ws test for Python 3.9 (janus.Queue needs running loop) Construct the janus.Queue inside asyncio.run so it has a running event loop on Python < 3.10, where janus.Queue.__init__ calls get_running_loop(). Co-Authored-By: Claude Opus 4.8 --- tests/unit/test_websocket_executor.py | 29 ++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/unit/test_websocket_executor.py b/tests/unit/test_websocket_executor.py index bdd29e9c23..79c7254f6c 100644 --- a/tests/unit/test_websocket_executor.py +++ b/tests/unit/test_websocket_executor.py @@ -8,6 +8,7 @@ ``websocket_max_workers`` argument to ``Dash``. """ +import asyncio import threading from concurrent.futures import ThreadPoolExecutor from contextvars import ContextVar @@ -98,19 +99,29 @@ def cb(value): ) executor = ThreadPoolExecutor(max_workers=2) - outbound_queue: janus.Queue = janus.Queue() - ws_cb = DashWebsocketCallback({}, "rid", outbound_queue, threading.Event(), None) - try: - # Bind the ContextVar on this (calling) thread, as middleware would. - myvar.set("MIDDLEWARE_VALUE") - future = run_callback_in_executor( - executor, app, payload, ws_cb, app.backend.response_adapter() + async def run(): + # janus.Queue must be constructed with a running loop on Python < 3.10. + outbound_queue: janus.Queue = janus.Queue() + ws_cb = DashWebsocketCallback( + {}, "rid", outbound_queue, threading.Event(), None ) - result = future.result(timeout=10) + try: + # Bind the ContextVar on this (calling/event-loop) thread, as + # middleware would; run_callback_in_executor must snapshot it here. + myvar.set("MIDDLEWARE_VALUE") + future = run_callback_in_executor( + executor, app, payload, ws_cb, app.backend.response_adapter() + ) + return future.result(timeout=10) + finally: + outbound_queue.close() + await outbound_queue.wait_closed() + + try: + result = asyncio.run(run()) finally: executor.shutdown(wait=False) - outbound_queue.close() assert result["status"] == "ok" # The worker thread would see the default without the calling-thread snapshot.