diff --git a/CHANGELOG.md b/CHANGELOG.md index 56e4a8a53d..e4687a13f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](https://semver.org/). - [#3826](https://github.com/plotly/dash/pull/3826) WebSocket callback dispatch no longer lets long-lived callbacks limit the number of concurrent users. Async callbacks (including session-persistent ones) run directly on the connection event loop instead of occupying a worker thread, and synchronous callbacks run on a shared `ThreadPoolExecutor` whose size is configurable via the new `websocket_max_workers` argument to `Dash` (default `4`). A synchronous persistent (no-output) callback now warns at registration since it would tie up a worker thread. ## Fixed +- [#3861](https://github.com/plotly/dash/issues/3861) Fix synchronous WebSocket callbacks not inheriting `ContextVar` values bound by ASGI middleware. `copy_context()` is now captured on the event-loop thread before the callback is submitted to the thread pool, instead of inside the worker thread where only default values were visible. - [#3779](https://github.com/plotly/dash/pull/3779) Fix `dash.testing` `Browser.get_logs()` returning `None` on non-Chrome webdrivers, which broke assertions like `assert dash_duo.get_logs() == []`. It now returns `[]`, matching the Chrome code path. - [#3822](https://github.com/plotly/dash/pull/3822) Fix `UnboundLocalError` for `user_callback_output` in async background callbacks (Celery and Diskcache managers) when the callback raises `PreventUpdate` or another exception before the variable is assigned. - [#3819](https://github.com/plotly/dash/pull/3819) Fix `RuntimeError: No active request in context` when a non-Dash path falls through to the FastAPI catch-all route. Fixes [#3812](https://github.com/plotly/dash/issues/3812). diff --git a/dash/backends/ws.py b/dash/backends/ws.py index 6d0168ecda..6dc2c0e926 100644 --- a/dash/backends/ws.py +++ b/dash/backends/ws.py @@ -550,14 +550,18 @@ def run_callback_in_executor( Future representing the pending callback execution """ + # Snapshot the context on the calling thread (the event-loop thread) *before* + # submitting to the executor. ContextVars bound by ASGI middleware live on this + # thread; capturing inside execute() would run on a fresh worker thread and see + # only default values. This mirrors loop.run_in_executor's context propagation. + ctx = copy_context() + def execute() -> dict: try: partial_func = _prepare_ws_partial( dash_app, payload, ws_callback, response_adapter ) - ctx = copy_context() - # Run in new event loop (handles a callback that still returns a # coroutine, e.g. when reached outside the async dispatch path) def run_callback(): diff --git a/tests/unit/test_websocket_executor.py b/tests/unit/test_websocket_executor.py index 5ab957768c..79c7254f6c 100644 --- a/tests/unit/test_websocket_executor.py +++ b/tests/unit/test_websocket_executor.py @@ -8,9 +8,17 @@ ``websocket_max_workers`` argument to ``Dash``. """ +import asyncio +import threading from concurrent.futures import ThreadPoolExecutor +from contextvars import ContextVar +from typing import cast -from dash import Dash +import janus + +from dash import Dash, Input, Output +from dash.backends.ws import DashWebsocketCallback, run_callback_in_executor +from dash.types import CallbackExecutionBody def test_websocket_max_workers_default(): @@ -61,3 +69,60 @@ def test_shutdown_executor_allows_recreation(): assert ex1 is not ex2 finally: backend.shutdown_executor(wait=False) + + +def test_run_callback_in_executor_propagates_contextvars(): + """Sync WS callbacks inherit ContextVars bound on the calling thread. + + Regression test for gh-3861: ``copy_context()`` must be captured in + ``run_callback_in_executor`` (on the event-loop thread, where ASGI middleware + binds per-request ContextVars) rather than inside the worker-thread ``execute`` + closure, which would only ever see default values. + """ + myvar: ContextVar = ContextVar("myvar", default="DEFAULT") + + app = Dash(__name__) + + @app.callback(Output("out", "children"), Input("in", "value"), websocket=True) + def cb(value): + return f"{myvar.get()}:{value}" + + payload = cast( + CallbackExecutionBody, + { + "output": "out.children", + "outputs": {"id": "out", "property": "children"}, + "inputs": [{"id": "in", "property": "value", "value": "hi"}], + "state": [], + "changedPropIds": ["in.value"], + }, + ) + + executor = ThreadPoolExecutor(max_workers=2) + + async def run(): + # janus.Queue must be constructed with a running loop on Python < 3.10. + outbound_queue: janus.Queue = janus.Queue() + ws_cb = DashWebsocketCallback( + {}, "rid", outbound_queue, threading.Event(), None + ) + try: + # Bind the ContextVar on this (calling/event-loop) thread, as + # middleware would; run_callback_in_executor must snapshot it here. + myvar.set("MIDDLEWARE_VALUE") + future = run_callback_in_executor( + executor, app, payload, ws_cb, app.backend.response_adapter() + ) + return future.result(timeout=10) + finally: + outbound_queue.close() + await outbound_queue.wait_closed() + + try: + result = asyncio.run(run()) + finally: + executor.shutdown(wait=False) + + assert result["status"] == "ok" + # The worker thread would see the default without the calling-thread snapshot. + assert result["data"]["response"]["out"]["children"] == "MIDDLEWARE_VALUE:hi"