I’m developing an application that requires heavy calculations, so I need to offload all processing to external workers. Currently, I’m using RQ (https://python-rq.org/), but that can be anything like Celery, threads, etc.
The key requirement is for the system to return results for a given ID, which I then want to display in Dash associated with that ID.
To illustrate the concept: a user enters a phrase, and I use an external worker to convert each letter to uppercase in parallel tasks. As soon as each letter is processed, I want to display the result in Dash. In my actual application, the result of one task may trigger the creation of new tasks to be processed in the same manner.
I started with this example: https://github.com/tcbegley/dash-rq-demo/blob/main/dash_rq_demo/init.py, which processes an entire phrase in one task. I modified the code to create a separate task for each letter, and it works, but it’s not elegant.
My initial idea was to avoid creating an interval check for each task, as checking 100 intervals seems inefficient; checking one is much better. However, with this approach, if a new task is completed, I have to update the dynamic output for all elements, not just the new ones. This seems inefficient as well, especially if some results are complex graphs with 20k points or large dataframes. Resending them all seems wasteful.
Ideally, I’d like to use a single WebSocket connection to notify when a result is ready (mapping ID to result) and directly update the corresponding element by its ID. However, I haven’t found a way to set State({"type": "dynamic_output", "index": ALL}, "children")
when I know the ‘index’ value outside of a callback context. It seems straightforward, but how can it be achieved?
My modification of this dash-rq-demo/dash_rq_demo/__init__.py at main · tcbegley/dash-rq-demo · GitHub to new version, where I create new task for every letter:
import uuid
from collections import namedtuple
import dash_bootstrap_components as dbc
from dash import Input, Output, State, dcc, html, no_update, ALL, MATCH
from rq.exceptions import NoSuchJobError
from rq.job import Job
from .core import app, conn, queue
from .tasks import slow_loop, slow_dash_div
from modules import *
logger = get_logger(__name__)
# use named tuple for return value of multiple output callback for readability
Result = namedtuple(
"Result", ["result", "progress", "collapse_is_open", "finished_data"]
)
EXPLAINER = """
This app demonstrates asynchronous execution of long running tasks in Dash
using Redis and RQ. When you type text in the box below and click on the
'Upper case' button, the text will be converted to upper case character by
character (with a time delay in each iteration of the loop). An interval checks
periodically for completion of the task, and also updates a progress bar in the
UI to inform the user of the progress being made.
"""
app.layout = dbc.Container(
[
# two stores, one to track which job was most recently started, one to
# track which job was most recently completed. if they differ, then
# there is a job still running.
dcc.Store(id="submitted_store"),
dcc.Store(id="finished_store"),
dcc.Interval(id="interval", interval=500 * 5),
html.H2("MULTI Redis / RQ demo", className="display_4"),
html.P(EXPLAINER),
html.Hr(),
dbc.Textarea(value="123", id="text", className="mb_3"),
dbc.Button("Upper case", id="button", color="primary", className="mb_3"),
html.P(id="log_result"),
dbc.Collapse(dbc.Progress(id="progress", className="mb_3"), id="collapse"),
html.P(id="output_result"),
]
)
@app.callback(
[
Output("submitted_store", "data"),
Output("output_result", "children"),
Output("log_result", "children"),
],
[Input("button", "n_clicks")],
[
State("text", "value"),
State("submitted_store", "data"),
State("output_result", "children"),
],
)
def submit_to_multiple_tasks(n_clicks, text, submitted_store, output_result):
"""
Submit a job to the queue, log the id in submitted_store
"""
if n_clicks:
# submitted_store = {}
submitted_store = submitted_store or {}
output_result = output_result or []
for num, symbol in enumerate(text, 1):
id_ = str(uuid.uuid4())
# queue the task
# queue.enqueue(slow_loop, text, job_id=id_)
queue.enqueue(slow_dash_div, symbol, job_id=id_)
submitted_store[id_] = {}
output_result.append(
html.Div(
[
html.Div(
[f"processing {num}/{len(text)} {id_}"],
id={"index": id_, "type": "dynamic_first_message"},
),
html.Div(
["no progress"],
id={"index": id_, "type": "dynamic_progress"},
),
html.Div(
id={"index": id_, "type": "dynamic_output"},
),
],
)
)
# log process id in dcc.Store
message = f"{len(submitted_store)} {submitted_store=}"
logger.debug(message)
return submitted_store, output_result, message
return {}, [], "empty log"
if 1 and True:
@app.callback(
[
Output("finished_store", "data"),
Output("log_result", "children", allow_duplicate=True),
Output({"type": "dynamic_progress", "index": ALL}, "children"),
Output({"type": "dynamic_output", "index": ALL}, "children"),
# Output("progress", "value"),
# Output("collapse", "is_open"),
],
[Input("interval", "n_intervals")],
[
State("submitted_store", "data"),
State("finished_store", "data"),
State({"type": "dynamic_progress", "index": ALL}, "id"),
State({"type": "dynamic_progress", "index": ALL}, "children"),
State({"type": "dynamic_output", "index": ALL}, "children"),
],
prevent_initial_call=True,
)
def retrieve_output(
n,
submitted,
finished_store,
dynamic_progress_id,
dynamic_progress,
dynamic_output,
):
# def retrieve_output(n):
"""
Periodically check the most recently submitted job to see if it has
completed.
"""
# finished_store = None
dynamic_progress_id = dynamic_progress_id or []
dynamic_progress = dynamic_progress or []
dynamic_output = dynamic_output or []
msg_step = f"at {get_human_time_ms()} {n=} {dynamic_progress_id=} {dynamic_progress=} {submitted=} {finished_store=}"
finished_store = finished_store or {}
# msg = f"at {get_human_time_ms()} {n=}"
logger.debug(msg_step)
if n and submitted:
dynamic_progress = []
dynamic_output = []
ids = list(submitted.keys())
logger.debug(f"check {len(ids)} {ids=}")
jobs = Job.fetch_many(ids, connection=conn)
for num, job in enumerate(jobs, 1):
id = job.id
job_status = job.get_status()
msg = f" at {get_human_time_ms()} check {num}/{len(jobs)} {id=} {job_status=} {job.id=} {job.func_name=}"
logger.debug(msg)
dynamic_progress.append(msg)
dynamic_result = ""
if id not in finished_store:
finished_store[id] = {"status": "in_progress"}
if job_status == "finished":
finished_store[id]["status"] = "finished"
dynamic_result = job.result
dynamic_output.append(dynamic_result)
return finished_store, [msg_step], dynamic_progress, dynamic_output
What is the best approach to organize this?