Multiprocessing and Dash

Please consider the following code, which launches a secondary process (process1) that places the strings “1”, “2”, … onto queue. In the primary process, which runs the Dash GUI, we launch a callback update_history that retrieves these items and places them in the global variable history. We then update the GUI with update_markdown. This code seems to work. As expected, you see both callbacks firing regularly and the screen updates properly.

import dash
from dash import Dash, dcc, html, Input, Output, State
from dash.exceptions import PreventUpdate
import dash_bootstrap_components as dbc
from dash_extensions.enrich import DashProxy, MultiplexerTransform, NoOutputTransform, BlockingCallbackTransform
from multiprocessing import Manager, Process, Queue, Barrier, Lock
from time import sleep
from datetime import datetime as dt
from queue import Empty
 
def method(queue):
    count=0
    while True:
        count += 1
        queue.put(str(count))
        sleep(1)
 
if __name__ == '__main__':
    queue = Queue()
 
    process1 = Process(target=method,     
                       args=(queue,))
    process1.start()
 
    app = DashProxy(__name__,
                    external_stylesheets=[dbc.themes.BOOTSTRAP],
                    transforms=[MultiplexerTransform(),
                                NoOutputTransform(),
                                BlockingCallbackTransform()]
                   )
 
    interval = dcc.Interval(id="interval", interval=1000, n_intervals=0)
    markdown = dcc.Markdown(id="markdown", dangerously_allow_html=True)
 
    app.layout = html.Div([markdown, interval])
 
    manager = Manager()
    lock = manager.Lock()
    history = []
    start = dt.now()
 
    @app.callback(
        Input("interval", "n_intervals")
    )
    def update_history(n_intervals, blocking=True, timeout=1000):
        if not n_intervals:
            raise PreventUpdate
        print("a start")
        while True:
            item = queue.get()
            with lock:
                history.append(item)
        print("a exit")
 
    @app.callback(
        Output("markdown", "children"),
        Input("interval", "n_intervals")
    )
    def update_markdown(n_intervals, blocking=True):
        print("   b start")
        result = "history contents<br>"
        result += f"{(dt.now()-start).total_seconds():.2f}<br>"
        with lock:
            result += " ".join(history)
        print("   b exit")
        return result
 
    app.run_server(debug=True, use_reloader=False)

However, changing update_history to fire once and run in a loop fails:

    @app.callback(
        Input("interval", "n_intervals")
    )
    def update_history(n_intervals, blocking=True, timeout=1000):
        if not n_intervals:
            raise PreventUpdate
        print("a start")
        while True:
            item = queue.get()
            with lock:
                history.append(item)
        print("a exit")

It fails in a peculiar way: it runs a couple of times and then the app freezes. More strangely, you see that this callback is fired multiple times and none of those calls exit! This is despite the fact that this is supposedly a blocking callback (Dash). I would have expected a single invocation that would block all further calls (at least for 1000 seconds). I could certainly hack this to prevent further callbacks, but what I’m more interested in understanding why the system hangs.

This way of merging Dash and multiprocessing goes against the Dash proscription against globals, but as far as I understand it, that only applies when multiple clients will be connected (which won’t be the case in my application). If you have an alternative suggestion for setting up Dash with multiprocessing, I’d love to hear it!

The answer turned out to be really trivial:

def update_history(n_intervals, blocking=True, timeout=1000):

is wrong because blocking=True should be in the decorator:

@app.callback(
        Input("interval", "n_intervals"),
        blocking=True,
    )

Because my mistake creates a callback that does not block, it runs too long and callbacks accumulate. Pretty soon, you fill up the available thread pool and you can’t start any more. I don’t exactly understand the details of how Dash fails, but the lesson is to avoid having any Interval-triggered callbacks that take longer than the interval period (or use blocking callbacks).

3 Likes