I returned back to one callback, that analyze one general state of all tasks (not 100 callback per 100 tasks like in the previous version).
Here it is:
import uuid
from collections import namedtuple
import dash_bootstrap_components as dbc
from dash import Input, Output, State, dcc, html, no_update, ALL, MATCH
from rq.exceptions import NoSuchJobError
from rq.job import Job
from .core import app, conn, queue
from .tasks import slow_loop, slow_dash_div
from modules import *
logger = get_logger(__name__)
# use named tuple for return value of multiple output callback for readability
Result = namedtuple(
"Result", ["result", "progress", "collapse_is_open", "finished_data"]
)
EXPLAINER = """
This app demonstrates asynchronous execution of long running tasks in Dash
using Redis and RQ. When you type text in the box below and click on the
'Upper case' button, the text will be converted to upper case character by
character (with a time delay in each iteration of the loop). An interval checks
periodically for completion of the task, and also updates a progress bar in the
UI to inform the user of the progress being made.
"""
app.layout = dbc.Container(
[
# two stores, one to track which job was most recently started, one to
# track which job was most recently completed. if they differ, then
# there is a job still running.
dcc.Store(id="submitted_store", data={}),
dcc.Store(id="finished_store", data={}),
dcc.Interval(id="interval", interval=50),
html.H2("MULTI Redis / RQ demo", className="display_4"),
html.P(EXPLAINER),
html.Hr(),
dbc.Textarea(value="123", id="text", className="mb_3"),
dbc.Button("Upper case", id="button", color="primary", className="mb_3"),
html.P(id="log_result"),
dbc.Collapse(dbc.Progress(id="progress", className="mb_3"), id="collapse"),
html.P(id="output_result"),
]
)
@app.callback(
[
Output("output_result", "children"),
Output("log_result", "children"),
Output("submitted_store", "data"),
Output("interval", "disabled"),
],
[Input("button", "n_clicks")],
[
State("text", "value"),
State("output_result", "children"),
State("submitted_store", "data"),
],
)
def submit_to_multiple_tasks(n_clicks, text, output_result, submitted_store):
"""
Submit a job to the queue, log the id in submitted_store
"""
output_result = output_result or []
message = "empty_log"
if n_clicks:
logger.debug(f"submit_to_multiple_tasks {submitted_store=}")
for num, symbol in enumerate(text, 1):
id_ = f"{num}_{uuid.uuid4()}"
# queue the task
# queue.enqueue(slow_loop, text, job_id=id_)
queue.enqueue(slow_dash_div, symbol * 3, job_id=id_)
submitted_store[id_] = {"status": "queued"}
output_result.insert(
0,
html.Div(
[
html.Div(
[f"processing {num}/{len(text)} {id_}"],
id={"index": id_, "type": "dynamic_first_message"},
),
html.Div(
["no progress"],
id={"index": id_, "type": "dynamic_progress"},
),
html.Div(
id={"index": id_, "type": "dynamic_output"},
),
],
),
)
# log process id in dcc.Store
message = f"{len(submitted_store)} {submitted_store=}"
logger.debug(message)
interval_disabled = False
return output_result, message, submitted_store, interval_disabled
if 1 and True:
@app.callback(
[
Output("log_result", "children", allow_duplicate=True),
Output(
{"type": "dynamic_progress", "index": ALL},
"children",
allow_duplicate=True,
),
Output(
{"type": "dynamic_output", "index": ALL},
"children",
allow_duplicate=True,
),
Output("finished_store", "data", allow_duplicate=True),
Output("interval", "disabled", allow_duplicate=True),
],
[Input("interval", "n_intervals")],
[
State("submitted_store", "data"),
State("finished_store", "data"),
State({"type": "dynamic_progress", "index": ALL}, "id"),
],
prevent_initial_call=True,
# blocking=True,
)
def retrieve_result(
n,
submitted_store,
finished_store,
dynamic_progress_id,
):
interval_disabled = False
"""
Periodically check the most recently submitted_store job to see if it has
completed.
"""
# return no_update
t_start_main = time.time()
msg_step = f"{n} retrieve_result at {get_human_time_ms()} {n=}, total {len(submitted_store)} tasks"
# msg = f"at {get_human_time_ms()} {n=}"
logger.debug(msg_step)
id_to_results = {}
ids_todo = []
ids_finished = []
ids_finished_new = []
if 1 and n and submitted_store:
ids_finished = set(finished_store.keys())
ids_todo = set(submitted_store.keys()) - ids_finished
t_start = time.time()
logger.debug(f" check {len(ids_todo)} {ids_todo=}")
jobs = Job.fetch_many(ids_todo, connection=conn)
for num, job in enumerate(jobs, 1):
id = job.id
job_status = job.get_status()
# msg = f" at {get_human_time_ms()} check {num}/{len(jobs)} {id=} {job_status=} {job.id=} {job.func_name=}"
progress = job.meta.get("progress", 0)
progress_human = job.meta.get("progress_human", "")
msg = f" at {get_human_time_ms()} check {job_status=}, done {progress:.1f}% / {progress_human} {job.meta.get('worker_id', 'no_worker_id')}"
# logger.debug(msg)
id_to_results[id] = {"status": "no_update", "result": None, "msg": msg}
if job_status == "finished":
# dynamic_result = job.result
dynamic_result = html.B("mockup_result")
id_to_results[id].update(
{"status": "finished", "result": dynamic_result}
)
ids_finished_new.append(id)
duration = time.time() - t_start
logger.debug(f"checked in {duration:.1} seconds")
if len(ids_todo) == 0:
finished_store = no_update
interval_disabled = no_update
new_progress = [no_update] * len(dynamic_progress_id)
new_output = [no_update] * len(dynamic_progress_id)
if len(ids_todo) > 0:
new_progress = []
new_output = []
for el in dynamic_progress_id:
id = el["index"]
progress = no_update
output = no_update
res = id_to_results.get(id, None)
if res is not None:
status = res["status"]
if status != "finished":
progress = res["msg"]
else:
progress = "done"
output = res["result"]
finished_store[id] = {}
new_progress.append(progress)
new_output.append(output)
ids_finished = set(finished_store.keys())
if submitted_store:
ids_todo = set(submitted_store.keys()) - ids_finished
logger.debug(f"{n=} {ids_todo=}")
if 1 and len(ids_todo) == 0:
interval_disabled = True
logger.warning("disable interval, no tasks")
if not ids_finished_new:
finished_store = no_update
duration = time.time() - t_start_main
return (
[
f"{len(ids_todo)} todo, {len(ids_finished)} finished, checked in {duration:.3f} seconds, ",
msg_step,
],
new_progress,
new_output,
finished_store,
interval_disabled,
)
It works much much better, but also after hundreds of tasks it start working not so smooth as expected.