Subject: how to process calculations with a queue (Celery, RQ, etc.) and display results immediately after completion

I’m developing an application that requires heavy calculations, so I need to offload all processing to external workers. Currently, I’m using RQ (https://python-rq.org/), but that can be anything like Celery, threads, etc.

The key requirement is for the system to return results for a given ID, which I then want to display in Dash associated with that ID.

To illustrate the concept: a user enters a phrase, and I use an external worker to convert each letter to uppercase in parallel tasks. As soon as each letter is processed, I want to display the result in Dash. In my actual application, the result of one task may trigger the creation of new tasks to be processed in the same manner.

I started with this example: https://github.com/tcbegley/dash-rq-demo/blob/main/dash_rq_demo/init.py, which processes an entire phrase in one task. I modified the code to create a separate task for each letter, and it works, but it’s not elegant.

My initial idea was to avoid creating an interval check for each task, as checking 100 intervals seems inefficient; checking one is much better. However, with this approach, if a new task is completed, I have to update the dynamic output for all elements, not just the new ones. This seems inefficient as well, especially if some results are complex graphs with 20k points or large dataframes. Resending them all seems wasteful.

Ideally, I’d like to use a single WebSocket connection to notify when a result is ready (mapping ID to result) and directly update the corresponding element by its ID. However, I haven’t found a way to set State({"type": "dynamic_output", "index": ALL}, "children") when I know the ‘index’ value outside of a callback context. It seems straightforward, but how can it be achieved?

My modification of this dash-rq-demo/dash_rq_demo/__init__.py at main · tcbegley/dash-rq-demo · GitHub to new version, where I create new task for every letter:

import uuid
from collections import namedtuple

import dash_bootstrap_components as dbc
from dash import Input, Output, State, dcc, html, no_update, ALL, MATCH
from rq.exceptions import NoSuchJobError
from rq.job import Job

from .core import app, conn, queue
from .tasks import slow_loop, slow_dash_div

from modules import *

logger = get_logger(__name__)

# use named tuple for return value of multiple output callback for readability
Result = namedtuple(
    "Result", ["result", "progress", "collapse_is_open", "finished_data"]
)

EXPLAINER = """
This app demonstrates asynchronous execution of long running tasks in Dash
using Redis and RQ. When you type text in the box below and click on the
'Upper case' button, the text will be converted to upper case character by
character (with a time delay in each iteration of the loop). An interval checks
periodically for completion of the task, and also updates a progress bar in the
UI to inform the user of the progress being made.
"""

app.layout = dbc.Container(
    [
        # two stores, one to track which job was most recently started, one to
        # track which job was most recently completed. if they differ, then
        # there is a job still running.
        dcc.Store(id="submitted_store"),
        dcc.Store(id="finished_store"),
        dcc.Interval(id="interval", interval=500 * 5),
        html.H2("MULTI Redis / RQ demo", className="display_4"),
        html.P(EXPLAINER),
        html.Hr(),
        dbc.Textarea(value="123", id="text", className="mb_3"),
        dbc.Button("Upper case", id="button", color="primary", className="mb_3"),
        html.P(id="log_result"),
        dbc.Collapse(dbc.Progress(id="progress", className="mb_3"), id="collapse"),
        html.P(id="output_result"),
    ]
)


@app.callback(
    [
        Output("submitted_store", "data"),
        Output("output_result", "children"),
        Output("log_result", "children"),
    ],
    [Input("button", "n_clicks")],
    [
        State("text", "value"),
        State("submitted_store", "data"),
        State("output_result", "children"),
    ],
)
def submit_to_multiple_tasks(n_clicks, text, submitted_store, output_result):
    """
    Submit a job to the queue, log the id in submitted_store
    """
    if n_clicks:
        # submitted_store = {}
        submitted_store = submitted_store or {}
        output_result = output_result or []
        for num, symbol in enumerate(text, 1):
            id_ = str(uuid.uuid4())

            # queue the task
            # queue.enqueue(slow_loop, text, job_id=id_)
            queue.enqueue(slow_dash_div, symbol, job_id=id_)
            submitted_store[id_] = {}
            output_result.append(
                html.Div(
                    [
                        html.Div(
                            [f"processing {num}/{len(text)} {id_}"],
                            id={"index": id_, "type": "dynamic_first_message"},
                        ),
                        html.Div(
                            ["no progress"],
                            id={"index": id_, "type": "dynamic_progress"},
                        ),
                        html.Div(
                            id={"index": id_, "type": "dynamic_output"},
                        ),
                    ],
                )
            )

            # log process id in dcc.Store
        message = f"{len(submitted_store)} {submitted_store=}"
        logger.debug(message)
        return submitted_store, output_result, message

    return {}, [], "empty log"


if 1 and True:

    @app.callback(
        [
            Output("finished_store", "data"),
            Output("log_result", "children", allow_duplicate=True),
            Output({"type": "dynamic_progress", "index": ALL}, "children"),
            Output({"type": "dynamic_output", "index": ALL}, "children"),
            # Output("progress", "value"),
            # Output("collapse", "is_open"),
        ],
        [Input("interval", "n_intervals")],
        [
            State("submitted_store", "data"),
            State("finished_store", "data"),
            State({"type": "dynamic_progress", "index": ALL}, "id"),
            State({"type": "dynamic_progress", "index": ALL}, "children"),
            State({"type": "dynamic_output", "index": ALL}, "children"),
        ],
        prevent_initial_call=True,
    )
    def retrieve_output(
        n,
        submitted,
        finished_store,
        dynamic_progress_id,
        dynamic_progress,
        dynamic_output,
    ):
        # def retrieve_output(n):
        """
        Periodically check the most recently submitted job to see if it has
        completed.
        """
        # finished_store = None
        dynamic_progress_id = dynamic_progress_id or []
        dynamic_progress = dynamic_progress or []
        dynamic_output = dynamic_output or []
        msg_step = f"at {get_human_time_ms()} {n=} {dynamic_progress_id=} {dynamic_progress=} {submitted=} {finished_store=}"
        finished_store = finished_store or {}
        # msg = f"at {get_human_time_ms()} {n=}"
        logger.debug(msg_step)

        if n and submitted:
            dynamic_progress = []
            dynamic_output = []

            ids = list(submitted.keys())
            logger.debug(f"check {len(ids)} {ids=}")
            jobs = Job.fetch_many(ids, connection=conn)
            for num, job in enumerate(jobs, 1):
                id = job.id
                job_status = job.get_status()
                msg = f"   at {get_human_time_ms()} check {num}/{len(jobs)} {id=} {job_status=} {job.id=} {job.func_name=}"
                logger.debug(msg)
                dynamic_progress.append(msg)

                dynamic_result = ""

                if id not in finished_store:
                    finished_store[id] = {"status": "in_progress"}

                if job_status == "finished":
                    finished_store[id]["status"] = "finished"
                    dynamic_result = job.result

                dynamic_output.append(dynamic_result)
        return finished_store, [msg_step], dynamic_progress, dynamic_output

What is the best approach to organize this?

I did something similar some time ago. I used a workflow along these lines,

  1. In a callback (triggered by the user), tasks were queued, the resulting (new) job ids were recorded to a (clientside) store, and the interval component was enabled
  2. In another callback (triggered by the interval component), the status of the jobs was checked. For completed jobs, the results were fetched, updates were dispatched according, and the job ids were purged from the store. For jobs still running, no updates were performed (e.g. by retuning dash.no_update), and the job ids were kept in the store, to be processed in a subsequent invocation. When the store was empty, the interval component was disabled

This setup worked well, with the exception that if the processing time of (2) exceeded the interval polling rate, no updates would ever be issued. As a workaround I made the BlockingCallbackTransform, which solves this issue.

I believe the interval component approach should work up to a polling rate of about ~ 1s or so, but if you need the latency lower than that, I agree that a websocket or similar is better suited.

2 Likes

I’ve developed a version where I create a separate interval for every task.

import uuid
from collections import namedtuple

import dash_bootstrap_components as dbc
from dash import Input, Output, State, dcc, html, no_update, ALL, MATCH
from rq.exceptions import NoSuchJobError
from rq.job import Job

from .core import app, conn, queue
from .tasks import slow_loop, slow_dash_div

from modules import *

from dash_extensions.enrich import (
    DashProxy,
    dcc,
    html,
    Output,
    Input,
    BlockingCallbackTransform,
)

from dash.exceptions import PreventUpdate

logger = get_logger(__name__)

# use named tuple for return value of multiple output callback for readability
Result = namedtuple(
    "Result", ["result", "progress", "collapse_is_open", "finished_data"]
)

EXPLAINER = """
This app demonstrates asynchronous execution of long running tasks in Dash
using Redis and RQ. When you type text in the box below and click on the
'Upper case' button, the text will be converted to upper case character by
character (with a time delay in each iteration of the loop). An interval checks
periodically for completion of the task, and also updates a progress bar in the
UI to inform the user of the progress being made.
"""

callback_is_running = {}

app.layout = dbc.Container(
    [
        # two stores, one to track which job was most recently started, one to
        # track which job was most recently completed. if they differ, then
        # there is a job still running.
        html.H2("MULTI Redis / RQ demo", className="display_4"),
        html.P(EXPLAINER),
        html.Hr(),
        dbc.Textarea(value="1" * 30, id="text", className="mb_3"),
        dbc.Button("Upper case", id="button", color="primary", className="mb_3"),
        html.P(id="log_result"),
        dbc.Collapse(dbc.Progress(id="progress", className="mb_3"), id="collapse"),
        html.P(id="output_result"),
    ]
)


@app.callback(
    [
        Output("output_result", "children"),
        Output("log_result", "children"),
    ],
    [Input("button", "n_clicks")],
    [
        State("text", "value"),
        State("output_result", "children"),
    ],
)
def submit_to_multiple_tasks(n_clicks, text, output_result):
    """
    Submit a job to the queue, log the id in submitted_store
    """
    if n_clicks:
        output_result = output_result or []
        ids = []
        for num, symbol in enumerate(text, 1):
            id_ = f"{num}_{uuid.uuid4()}"
            ids.append(id_)

            # queue the task
            # queue.enqueue(slow_loop, text, job_id=id_)
            queue.enqueue(slow_dash_div, symbol * 3, job_id=id_)
            output_result.insert(
                0,
                html.Div(
                    [
                        html.Div(
                            [f"processing {num}/{len(text)} {id_}"],
                            id={"index": id_, "type": "dynamic_first_message"},
                        ),
                        html.Div(
                            ["no progress"],
                            id={"index": id_, "type": "dynamic_progress"},
                        ),
                        html.Div(
                            id={"index": id_, "type": "dynamic_output"},
                        ),
                        dcc.Store(
                            id={"index": id_, "type": "submitted_store"},
                            data={"status": "in_progress"},
                        ),
                        dcc.Store(
                            id={"index": id_, "type": "finished_store"},
                            data={"status": "in_progress"},
                        ),
                        dcc.Interval(
                            id={"index": id_, "type": "interval"}, interval=500 * 1
                        ),
                    ],
                ),
            )

        message = f"added new {len(ids)} tasks {ids=}"
        logger.debug(message)
        return output_result, message

    return [], "empty log"


if 1 and True:

    @app.callback(
        [
            Output({"type": "interval", "index": MATCH}, "disabled"),
            Output({"type": "submitted_store", "index": MATCH}, "data"),
            Output({"type": "finished_store", "index": MATCH}, "data"),
            Output({"type": "dynamic_progress", "index": MATCH}, "children"),
            Output({"type": "dynamic_output", "index": MATCH}, "children"),
            # Output("progress", "value"),
            # Output("collapse", "is_open"),
        ],
        [Input({"type": "interval", "index": MATCH}, "n_intervals")],
        [
            State({"type": "interval", "index": MATCH}, "disabled"),
            State({"type": "submitted_store", "index": MATCH}, "data"),
            State({"type": "finished_store", "index": MATCH}, "data"),
            State({"type": "dynamic_progress", "index": MATCH}, "id"),
            State({"type": "dynamic_progress", "index": MATCH}, "children"),
            State({"type": "dynamic_output", "index": MATCH}, "children"),
        ],
        prevent_initial_call=True,
        # blocking=True,
    )
    def retrieve_output(
        n,
        interval_disabled,
        submitted_store,
        finished_store,
        dynamic_progress_id,
        dynamic_progress,
        dynamic_output,
    ):
        # def retrieve_output(n):
        """
        Periodically check the most recently submitted_store job to see if it has
        completed.
        """
        id = dynamic_progress_id["index"]

        is_first_callback = False
        if id not in callback_is_running:
            callback_is_running[id] = True
            is_first_callback = True

        if not n:
            raise PreventUpdate()

        if not is_first_callback and callback_is_running[id]:
            raise PreventUpdate()

        dynamic_output = dynamic_output or []
        msg_step = f"at {get_human_time_ms()} {n=} {interval_disabled=} {dynamic_progress_id=} {dynamic_progress=} {submitted_store=} {finished_store=}"
        logger.debug(msg_step)

        t_start = time.time()
        if 1 and n and submitted_store:
            ids = [id]
            logger.debug(f"     process job {id}...")
            jobs = Job.fetch_many(ids, connection=conn)
            for num, job in enumerate(jobs, 1):
                id = job.id
                job_status = job.get_status()

                progress = job.meta.get("progress", 0)
                progress_human = job.meta.get("progress_human", "")
                # "Processing _ {progress:.1f}% complete",

                msg = f"        at {get_human_time_ms()} check {job_status=}, done {progress:.1f}% / {progress_human}"

                # logger.debug(msg)
                dynamic_progress = [msg]

                dynamic_output = no_update

                if job_status == "finished":
                    finished_store["status"] = "finished"
                    # dynamic_output = job.result
                    dynamic_output = "mockup result"
                    interval_disabled = True

        duration = time.time() - t_start
        t_start = time.time()
        logger.debug(
            f"     +processed in {duration:.1f} seconds {id=} at {get_human_time_ms()}"
        )

        try:
            return (
                interval_disabled,
                submitted_store,
                finished_store,
                dynamic_progress,
                dynamic_output,
            )
        finally:
            callback_is_running[id] = False

This line queue.enqueue(slow_dash_div, symbol * 3, job_id=id_) is used to simulate long-running tasks that can report their current state. I also want this state to be displayed in Dash immediately. The interval is disabled when the task is finished.

This structure is simpler, but it encounters a problem with updates. If there are more than about 10 tasks (for example, 30), only the first 10 tasks get updated. Despite the fact that the other tasks are completed, subsequent callbacks do not report these changes, which I find very peculiar.

Furthermore, using https://www.dash-extensions.com/transforms/blocking_callback_transform in "index": MATCH callbacks is not possible; Dash simply returns an error. Therefore, this solution is inefficient for scenarios involving more than 10 tasks.

But it functions!

Now I am considering a solution involving websockets. This would entail a single thread function that listens to a websocket and, for every minor change (such as a progress update or ready status), utilizes custom JavaScript to set the value of an element by its ID (which appears to be the fastest approach).

Moreover, once a task is completed, the JavaScript would update the field with the task status to ‘finished’. Subsequently, a callback that monitors that element would be triggered immediately to set the final value of the result. So, in final structure there will be only one that callback for every task, and it should be very quickly.

I returned back to one callback, that analyze one general state of all tasks (not 100 callback per 100 tasks like in the previous version).

Here it is:

import uuid
from collections import namedtuple

import dash_bootstrap_components as dbc
from dash import Input, Output, State, dcc, html, no_update, ALL, MATCH
from rq.exceptions import NoSuchJobError
from rq.job import Job

from .core import app, conn, queue
from .tasks import slow_loop, slow_dash_div

from modules import *

logger = get_logger(__name__)

# use named tuple for return value of multiple output callback for readability
Result = namedtuple(
    "Result", ["result", "progress", "collapse_is_open", "finished_data"]
)

EXPLAINER = """
This app demonstrates asynchronous execution of long running tasks in Dash
using Redis and RQ. When you type text in the box below and click on the
'Upper case' button, the text will be converted to upper case character by
character (with a time delay in each iteration of the loop). An interval checks
periodically for completion of the task, and also updates a progress bar in the
UI to inform the user of the progress being made.
"""

app.layout = dbc.Container(
    [
        # two stores, one to track which job was most recently started, one to
        # track which job was most recently completed. if they differ, then
        # there is a job still running.
        dcc.Store(id="submitted_store", data={}),
        dcc.Store(id="finished_store", data={}),
        dcc.Interval(id="interval", interval=50),
        html.H2("MULTI Redis / RQ demo", className="display_4"),
        html.P(EXPLAINER),
        html.Hr(),
        dbc.Textarea(value="123", id="text", className="mb_3"),
        dbc.Button("Upper case", id="button", color="primary", className="mb_3"),
        html.P(id="log_result"),
        dbc.Collapse(dbc.Progress(id="progress", className="mb_3"), id="collapse"),
        html.P(id="output_result"),
    ]
)


@app.callback(
    [
        Output("output_result", "children"),
        Output("log_result", "children"),
        Output("submitted_store", "data"),
        Output("interval", "disabled"),
    ],
    [Input("button", "n_clicks")],
    [
        State("text", "value"),
        State("output_result", "children"),
        State("submitted_store", "data"),
    ],
)
def submit_to_multiple_tasks(n_clicks, text, output_result, submitted_store):
    """
    Submit a job to the queue, log the id in submitted_store
    """
    output_result = output_result or []
    message = "empty_log"
    if n_clicks:
        logger.debug(f"submit_to_multiple_tasks {submitted_store=}")
        for num, symbol in enumerate(text, 1):
            id_ = f"{num}_{uuid.uuid4()}"

            # queue the task
            # queue.enqueue(slow_loop, text, job_id=id_)
            queue.enqueue(slow_dash_div, symbol * 3, job_id=id_)
            submitted_store[id_] = {"status": "queued"}
            output_result.insert(
                0,
                html.Div(
                    [
                        html.Div(
                            [f"processing {num}/{len(text)} {id_}"],
                            id={"index": id_, "type": "dynamic_first_message"},
                        ),
                        html.Div(
                            ["no progress"],
                            id={"index": id_, "type": "dynamic_progress"},
                        ),
                        html.Div(
                            id={"index": id_, "type": "dynamic_output"},
                        ),
                    ],
                ),
            )

            # log process id in dcc.Store
        message = f"{len(submitted_store)} {submitted_store=}"
        logger.debug(message)

    interval_disabled = False
    return output_result, message, submitted_store, interval_disabled


if 1 and True:

    @app.callback(
        [
            Output("log_result", "children", allow_duplicate=True),
            Output(
                {"type": "dynamic_progress", "index": ALL},
                "children",
                allow_duplicate=True,
            ),
            Output(
                {"type": "dynamic_output", "index": ALL},
                "children",
                allow_duplicate=True,
            ),
            Output("finished_store", "data", allow_duplicate=True),
            Output("interval", "disabled", allow_duplicate=True),
        ],
        [Input("interval", "n_intervals")],
        [
            State("submitted_store", "data"),
            State("finished_store", "data"),
            State({"type": "dynamic_progress", "index": ALL}, "id"),
        ],
        prevent_initial_call=True,
        # blocking=True,
    )
    def retrieve_result(
        n,
        submitted_store,
        finished_store,
        dynamic_progress_id,
    ):
        interval_disabled = False
        """
        Periodically check the most recently submitted_store job to see if it has
        completed.
        """
        # return no_update

        t_start_main = time.time()

        msg_step = f"{n} retrieve_result at {get_human_time_ms()} {n=}, total {len(submitted_store)} tasks"
        # msg = f"at {get_human_time_ms()} {n=}"
        logger.debug(msg_step)

        id_to_results = {}

        ids_todo = []
        ids_finished = []
        ids_finished_new = []

        if 1 and n and submitted_store:
            ids_finished = set(finished_store.keys())
            ids_todo = set(submitted_store.keys()) - ids_finished

            t_start = time.time()
            logger.debug(f"  check {len(ids_todo)} {ids_todo=}")
            jobs = Job.fetch_many(ids_todo, connection=conn)
            for num, job in enumerate(jobs, 1):
                id = job.id

                job_status = job.get_status()
                # msg = f"   at {get_human_time_ms()} check {num}/{len(jobs)} {id=} {job_status=} {job.id=} {job.func_name=}"
                progress = job.meta.get("progress", 0)
                progress_human = job.meta.get("progress_human", "")
                msg = f"        at {get_human_time_ms()} check {job_status=}, done {progress:.1f}% / {progress_human} {job.meta.get('worker_id', 'no_worker_id')}"
                # logger.debug(msg)
                id_to_results[id] = {"status": "no_update", "result": None, "msg": msg}

                if job_status == "finished":
                    # dynamic_result = job.result
                    dynamic_result = html.B("mockup_result")
                    id_to_results[id].update(
                        {"status": "finished", "result": dynamic_result}
                    )
                    ids_finished_new.append(id)

            duration = time.time() - t_start
            logger.debug(f"checked in {duration:.1} seconds")

        if len(ids_todo) == 0:
            finished_store = no_update
            interval_disabled = no_update
            new_progress = [no_update] * len(dynamic_progress_id)
            new_output = [no_update] * len(dynamic_progress_id)

        if len(ids_todo) > 0:
            new_progress = []
            new_output = []
            for el in dynamic_progress_id:
                id = el["index"]
                progress = no_update
                output = no_update

                res = id_to_results.get(id, None)
                if res is not None:
                    status = res["status"]
                    if status != "finished":
                        progress = res["msg"]
                    else:
                        progress = "done"
                        output = res["result"]
                        finished_store[id] = {}

                new_progress.append(progress)
                new_output.append(output)

            ids_finished = set(finished_store.keys())
            if submitted_store:
                ids_todo = set(submitted_store.keys()) - ids_finished
                logger.debug(f"{n=} {ids_todo=}")
                if 1 and len(ids_todo) == 0:
                    interval_disabled = True
                    logger.warning("disable interval, no tasks")

        if not ids_finished_new:
            finished_store = no_update

        duration = time.time() - t_start_main
        return (
            [
                f"{len(ids_todo)} todo, {len(ids_finished)} finished, checked in {duration:.3f} seconds, ",
                msg_step,
            ],
            new_progress,
            new_output,
            finished_store,
            interval_disabled,
        )

It works much much better, but also after hundreds of tasks it start working not so smooth as expected.

My next architecture decisions here: https://community.plotly.com/t/dash-clientside-set-props-50-times-slower-than-document-getelementbyid-id-innertext/83425/15?u=kuplo