Subject: how to process calculations with a queue (Celery, RQ, etc.) and display results immediately after completion

I returned back to one callback, that analyze one general state of all tasks (not 100 callback per 100 tasks like in the previous version).

Here it is:

import uuid
from collections import namedtuple

import dash_bootstrap_components as dbc
from dash import Input, Output, State, dcc, html, no_update, ALL, MATCH
from rq.exceptions import NoSuchJobError
from rq.job import Job

from .core import app, conn, queue
from .tasks import slow_loop, slow_dash_div

from modules import *

logger = get_logger(__name__)

# use named tuple for return value of multiple output callback for readability
Result = namedtuple(
    "Result", ["result", "progress", "collapse_is_open", "finished_data"]
)

EXPLAINER = """
This app demonstrates asynchronous execution of long running tasks in Dash
using Redis and RQ. When you type text in the box below and click on the
'Upper case' button, the text will be converted to upper case character by
character (with a time delay in each iteration of the loop). An interval checks
periodically for completion of the task, and also updates a progress bar in the
UI to inform the user of the progress being made.
"""

app.layout = dbc.Container(
    [
        # two stores, one to track which job was most recently started, one to
        # track which job was most recently completed. if they differ, then
        # there is a job still running.
        dcc.Store(id="submitted_store", data={}),
        dcc.Store(id="finished_store", data={}),
        dcc.Interval(id="interval", interval=50),
        html.H2("MULTI Redis / RQ demo", className="display_4"),
        html.P(EXPLAINER),
        html.Hr(),
        dbc.Textarea(value="123", id="text", className="mb_3"),
        dbc.Button("Upper case", id="button", color="primary", className="mb_3"),
        html.P(id="log_result"),
        dbc.Collapse(dbc.Progress(id="progress", className="mb_3"), id="collapse"),
        html.P(id="output_result"),
    ]
)


@app.callback(
    [
        Output("output_result", "children"),
        Output("log_result", "children"),
        Output("submitted_store", "data"),
        Output("interval", "disabled"),
    ],
    [Input("button", "n_clicks")],
    [
        State("text", "value"),
        State("output_result", "children"),
        State("submitted_store", "data"),
    ],
)
def submit_to_multiple_tasks(n_clicks, text, output_result, submitted_store):
    """
    Submit a job to the queue, log the id in submitted_store
    """
    output_result = output_result or []
    message = "empty_log"
    if n_clicks:
        logger.debug(f"submit_to_multiple_tasks {submitted_store=}")
        for num, symbol in enumerate(text, 1):
            id_ = f"{num}_{uuid.uuid4()}"

            # queue the task
            # queue.enqueue(slow_loop, text, job_id=id_)
            queue.enqueue(slow_dash_div, symbol * 3, job_id=id_)
            submitted_store[id_] = {"status": "queued"}
            output_result.insert(
                0,
                html.Div(
                    [
                        html.Div(
                            [f"processing {num}/{len(text)} {id_}"],
                            id={"index": id_, "type": "dynamic_first_message"},
                        ),
                        html.Div(
                            ["no progress"],
                            id={"index": id_, "type": "dynamic_progress"},
                        ),
                        html.Div(
                            id={"index": id_, "type": "dynamic_output"},
                        ),
                    ],
                ),
            )

            # log process id in dcc.Store
        message = f"{len(submitted_store)} {submitted_store=}"
        logger.debug(message)

    interval_disabled = False
    return output_result, message, submitted_store, interval_disabled


if 1 and True:

    @app.callback(
        [
            Output("log_result", "children", allow_duplicate=True),
            Output(
                {"type": "dynamic_progress", "index": ALL},
                "children",
                allow_duplicate=True,
            ),
            Output(
                {"type": "dynamic_output", "index": ALL},
                "children",
                allow_duplicate=True,
            ),
            Output("finished_store", "data", allow_duplicate=True),
            Output("interval", "disabled", allow_duplicate=True),
        ],
        [Input("interval", "n_intervals")],
        [
            State("submitted_store", "data"),
            State("finished_store", "data"),
            State({"type": "dynamic_progress", "index": ALL}, "id"),
        ],
        prevent_initial_call=True,
        # blocking=True,
    )
    def retrieve_result(
        n,
        submitted_store,
        finished_store,
        dynamic_progress_id,
    ):
        interval_disabled = False
        """
        Periodically check the most recently submitted_store job to see if it has
        completed.
        """
        # return no_update

        t_start_main = time.time()

        msg_step = f"{n} retrieve_result at {get_human_time_ms()} {n=}, total {len(submitted_store)} tasks"
        # msg = f"at {get_human_time_ms()} {n=}"
        logger.debug(msg_step)

        id_to_results = {}

        ids_todo = []
        ids_finished = []
        ids_finished_new = []

        if 1 and n and submitted_store:
            ids_finished = set(finished_store.keys())
            ids_todo = set(submitted_store.keys()) - ids_finished

            t_start = time.time()
            logger.debug(f"  check {len(ids_todo)} {ids_todo=}")
            jobs = Job.fetch_many(ids_todo, connection=conn)
            for num, job in enumerate(jobs, 1):
                id = job.id

                job_status = job.get_status()
                # msg = f"   at {get_human_time_ms()} check {num}/{len(jobs)} {id=} {job_status=} {job.id=} {job.func_name=}"
                progress = job.meta.get("progress", 0)
                progress_human = job.meta.get("progress_human", "")
                msg = f"        at {get_human_time_ms()} check {job_status=}, done {progress:.1f}% / {progress_human} {job.meta.get('worker_id', 'no_worker_id')}"
                # logger.debug(msg)
                id_to_results[id] = {"status": "no_update", "result": None, "msg": msg}

                if job_status == "finished":
                    # dynamic_result = job.result
                    dynamic_result = html.B("mockup_result")
                    id_to_results[id].update(
                        {"status": "finished", "result": dynamic_result}
                    )
                    ids_finished_new.append(id)

            duration = time.time() - t_start
            logger.debug(f"checked in {duration:.1} seconds")

        if len(ids_todo) == 0:
            finished_store = no_update
            interval_disabled = no_update
            new_progress = [no_update] * len(dynamic_progress_id)
            new_output = [no_update] * len(dynamic_progress_id)

        if len(ids_todo) > 0:
            new_progress = []
            new_output = []
            for el in dynamic_progress_id:
                id = el["index"]
                progress = no_update
                output = no_update

                res = id_to_results.get(id, None)
                if res is not None:
                    status = res["status"]
                    if status != "finished":
                        progress = res["msg"]
                    else:
                        progress = "done"
                        output = res["result"]
                        finished_store[id] = {}

                new_progress.append(progress)
                new_output.append(output)

            ids_finished = set(finished_store.keys())
            if submitted_store:
                ids_todo = set(submitted_store.keys()) - ids_finished
                logger.debug(f"{n=} {ids_todo=}")
                if 1 and len(ids_todo) == 0:
                    interval_disabled = True
                    logger.warning("disable interval, no tasks")

        if not ids_finished_new:
            finished_store = no_update

        duration = time.time() - t_start_main
        return (
            [
                f"{len(ids_todo)} todo, {len(ids_finished)} finished, checked in {duration:.3f} seconds, ",
                msg_step,
            ],
            new_progress,
            new_output,
            finished_store,
            interval_disabled,
        )

It works much much better, but also after hundreds of tasks it start working not so smooth as expected.