Custom progress information on each loop iteration in app.callback

Hi All,

One of my callback is a loop of quite long Python process. I’d like to send some progress information to users while it is running.

Below you will find my code which represent what I’d like to do - in HERE section i’d like to update div with progress update.

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State
import time

app = dash.Dash(__name__)

app.layout = html.Div(
    [
        html.Div('Not run', id='div-1'),
        html.Button('Run long process', id='button-1', n_clicks=0)
     ]
)

@app.callback(
    Output('div-1', 'children'),
    [Input('button-1', 'n_clicks')],
    [State('div-1', 'children')]
)
def run_long_process(n_clicks, curr_status):
    status = curr_status
    if n_clicks > 0:
        # run_long_process
        for iteration in range(10):
            #quite long process
            time.sleep(1)
            #HERE i'd like to update div-1 with status: "{iteration}/10 finished"
        status = "Finished"
    return status

if __name__ == '__main__':
    app.run_server(debug=True)

You can run the long process asynchronously (using e.g. dask or celery) and pull the status periodically using a Dash interval component.

Here is some inspiration,

Thanks a lot - nice idea, I will take a look at this.

It assumes that process are being run async, what is a great idea, but what if I wanted to do this in sync and just show progress in text information? I am looking for some easy way to do this, without additional dependencies :confused:

Hey @michlimes this is a bit of a nasty hack but it works: you can use the callback itself as one iteration and let it trigger itself ten times until the process is done; then you get a sync output at the end of each iteration.

The nasty part is that you need to bypass the ban on circular callbacks, and you can do it by nesting divs one inside the other. I edited your code with an example for that:

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State
from dash.exceptions import PreventUpdate
import time

app = dash.Dash(__name__)

app.layout = html.Div(
    [
		html.Div('Not run', id='div-1'),
		html.Button('Run long process', id='button-1', n_clicks=0),
		# this is a hidden div that contains the status of the loop (and functions as the loop itself)
		html.Div(id='status-container', children=html.Div(id='status', children=-1), style={'display': 'none'})
     ]
)

@app.callback(
	Output('status', 'children'),
	[Input('button-1', 'n_clicks')]
)
# this callback sets the process in motion by changing the status from -1 to 1
def button_clicked(click):
	if not click:
		raise PreventUpdate
	else:
		return 1

@app.callback(
	[Output('status-container', 'children'),
	 Output('div-1', 'children')],
	[Input('status', 'children')]
)
# this callback runs one step of your 10-step process, then triggers itself by increasing the status by 1
# since it cannot return as output the div that it uses as input, it returns its container, with the relevant div edited inside
def process_run(stage):
	if stage == -1:
		raise PreventUpdate
		
	# run one step of the process
	if stage < 10:
		time.sleep(1)
		# return the status container with the message about the end of this iteration
		return html.Div(id='status', children=stage+1), f'Iteration {stage}/10 finished'
	
	# all 10 iterations finished
	return html.Div(id='status', children=-1), 'Finished.'

if __name__ == '__main__':
    app.run_server(debug=True)

I have to add I’m not a dash expert and this practice looks a bit shady to me so would be great to hear what others think about it…

Thanks a lot, it works for me perfect. Off course, as you have written, if there is more “pythonic” solution and anyone knows it - feel free to share :slight_smile:

@michlimes, @itayma I was trying to do the same things as michlimes and found this thread. I used this solution successfully. However, it no longer works for me. We recently updated dash (to 1.13) in our latest python common environment. Now the function just stops after the first iteration (in this example, it prints out “Iteration 1/10 finished” and stops). I’m assuming that the updated dash no longer allows this trick, but maybe something else is going on. Have either of you seen this same issue?

@mwogulis you’re right, it’s not working anymore with v1.13…

Thank you! That saves me a lot of time in hunting down the problem :slight_smile:

As noted previously, another option is to write the state to a server side cache (redis, files, etc.). You can then read the state from another callback (triggered by an Interval component) and update the app accordingly. While the idea is simple, the implementation requires some book keeping. To ease the pain, I have written a few objects that hides the details. Here is a small example,

import time
import dash
import dash_core_components as dcc
import dash_html_components as html

from dash.dependencies import Input, Output
from dash_extensions.callback import CallbackPlumber, DiskPipeFactory

# Create example app.
app = dash.Dash(__name__, prevent_initial_callbacks=True)
app.layout = html.Div(
    [
        html.Button('Start', id='start', n_clicks=0), html.Button('Stop', id='stop', n_clicks=0),
        html.Div("Not running", id='progress'), html.Div("Last output = "), html.Div(id='result'),
        dcc.Interval(id='trigger', interval=1000),  html.Div(id='dummy')
    ]
)
# Create CallbackPlumber, use a folder on disk (server side) to save message in transit.
cp = CallbackPlumber(DiskPipeFactory("some_folder"))

@cp.piped_callback(Output('result', 'children'), [Input('start', 'n_clicks')])
def start_job(pipe, *args):
    n = 100
    for i in range(n):
        pipe.send("progress", float(i)/float(n)*100)  # send progress updates to the client
        if pipe.receive("kill"):  # listen for (kill) signal(s) from the client
            return "No result (job killed)"  # return a value indicating that the job was killed
        time.sleep(0.1)  # sleep to emulate a long job
    return "The result!"  # return the result

@app.callback(Output('progress', 'children'), [Input('trigger', 'n_intervals')])
def update_progress(*args):
    progress = cp.receive("start_job", "progress")  # get latest progress value from "start_job"
    if progress is not None:
        return "Running (progress = {:.0f}%)".format(progress)
    return "Not running"

@app.callback(Output('dummy', 'children'), [Input('stop', 'n_clicks')])
def stop_job(*args):
    cp.send("start_job", "kill", True)  # send kill signal to "start_job"

cp.register(app)  # this call attaches the piped callback

if __name__ == '__main__':
    app.run_server()

All callbacks added via the piped_callback decorator get a dedicated channel of communication identified via the function name (in the above example “start_job”), which is injected as the first argument (in the example called “pipe”) in the callback.

To avoid adding additional dependencies, the messages in transit are just written to disk. If you would like to try it out, you must install dash extensions,

pip install dash-extensions==0.0.20

Great. Thank you for the detailed suggestion! I’ll give it a try.

Hi Emil,

thanks for this example. I have a couple of questions:

  1. I noticed that your example does not work with newer versions of dash-extensions (0.0.51). There is no module dash_extensions.callback. Have the classes CallbackPlumber, DiskPipeFactory moved to another module? Or are they deprecated?

  2. Does this example run as async? What are technologies used?

  3. What solution would you recommend for running long calculations (typically a couple of minutes of processing time)? Would this snippet be suitable for that or I should rather look into dash-redis-celery-periodic-updates? Or something else entirely?

Context:
I’m trying to implement a dash app which would be used for running some relatively heavy optimisation calculations (from a couple of seconds up to tens of minutes of total processing time per function call) and presenting their results. I would like to use a progress bar to inform the user about the state of the calculation. I followed this example by @tcbegley. It uses Redis and Redis Queue. I managed to adapt the example to my use case and it works, but not reliably. Basically every second RQ Job fails due to an import error (ModuleNotFoundError). This is really strange because about half of the jobs runs just fine. This lead me to looking for an alternative solution to Redis and RQ.

It has been deprecated. It has been some time since I worked with it, but my general impression is that the best solution depends a lot on the use case and your IT infrastructure.

The simplest solution would probably be to run the long process async, write progress to a file on disk, and read that file in a callback which is invoked by an interval component.

A more robust and batteries-included approach would be using a celery task queue with e.g. a redis backend and an mqtt broker. It requires spinning up a few services, but if you know Docker it is non that bad.

Thanks for a quick response.

I deploy locally using docker. I think I will give a shot to redis+celery. Is the dash-redis-celery-periodic-updates repo still the recommended way of doing such thing?

Also, what do I need the mqtt broker for? In my current setup with redis and redis queue, I just use the Interval component to check whether or not the job has finished.

I haven’t use that repo, but I made a small example last time I worked with it,

It is not up to date with the current version of dash-extensions, but it should still be OK for inspiration.

1 Like