[Solved] Updating server side app data on a schedule

I am putting together an app that produces output that is updated hourly by a computationally expensive function. I don’t want the computationally expensive function to run when a user launches the app as it takes a long time, but rather I’d like it to run in the background on the server which would then serve up the updated data to all users of the app.

I have included code below as an example of my problem. The app is initialized with null values (999) and then every 20 seconds (instead of every hour) it updates the app components using current time/date information. Every time I reload the app it comes up with the null values and then updates the values after 20 seconds. I would prefer it to somehow used the latest data stored in the hidden div. I appreciate that the interval component runs on the client side, but is there a way to make it a server process? Am I trying to do something way different to what Dash is designed to do?

One solution would be update the data on my personal computer and redeploy the app on the remote server every hour, but it seems like an inelegant solution as I would then have to have a computer dedicated to task of updating an app on a remote server.

I’m coming at this as a scientist not a web person so apologies if this is total newbie stuff.

import dash
from dash.dependencies import Input, Output, Event
import dash_html_components as html
import dash_core_components as dcc
from datetime import datetime
import numpy as np
import pandas as pd

app = dash.Dash(__name__)

def compute_expensive_data():
    t=datetime.now()
    d = {'time' : pd.Series(np.array([t.minute, t.second]), index=['minute', 'second'])}
    dat = pd.DataFrame(d).to_json()
    
    return  dat

#Initial condition
d = {'time' : pd.Series(np.array([999, 999]), index=['minute', 'second'])}
dat = pd.DataFrame(d).to_json()

print(dat)

app.layout = html.Div([
        html.H3('Original Time: Minute = ' + str(pd.read_json(dat)['time']['minute']) + ': Second = ' + str(pd.read_json(dat)['time']['second'])),
        html.Div(id='title-line-children'),
        dcc.RadioItems(
            id='time-dropdown',
            options=[
                {'label': 'Minute', 'value': 'minute'}, {'label': 'Second', 'value': 'second'},
            ],
            value='minute'
        ), 
                        
        # Hidden div inside the app that stores the intermediate value
        html.Div(id='intermediate-value', style={'display': 'none'}, children = dat),
        
        dcc.Interval(
            id='interval-component',
            interval=20*1000 # 20 seconds in milliseconds
        )
        
    ])
        
@app.callback(
    Output('title-line-children', 'children'),
    [Input('time-dropdown', 'value'), Input('intermediate-value', 'children')])
def render(value,dat1):
    if value == 'minute':
        printStr = str(pd.read_json(dat1)['time']['minute'])
        outStr = 'Minute = ' + printStr
    elif value == 'second':
        printStr = str(pd.read_json(dat1)['time']['second'])
        outStr = 'Second = ' + printStr
    
    return outStr
    
@app.callback(Output('intermediate-value', 'children'),
              events=[Event('interval-component', 'interval')])
def update_global_var():
    return compute_expensive_data()

if __name__ == '__main__':
    app.run_server(debug=True)
1 Like

From what you’ve described, it sounds like updating a global variable in the Dash app on the remote server might be a simple way to meet your needs. The main concern here is that you don’t want the long running function (or the loop sleeping it) to block execution of your app. This is now a general Python question around concurrency rather than a Dash specific problem.

Typically you might solve this by running the function in another process or on another machine and use a message passing library like Celery to communicate asynchronously. This might be over-engineering things a little for your purposes though – Celery takes a bit of setup, requiring a message broker service such as RabbitMQ.

A simple solution that could work is using the concurrent.futures module (Python 3.2+) to paralellise execution of that function and stop it blocking execution. Below is how you can use it to run the function in another thread. If your function is CPU intensive (as opposed to IO-bound – such as making a request to a database) this won’t give you true parallelism, just simulated, however I think that’s probably ok here, as we just care that the web server runs at all alongside the long running function, not that you’re running anything faster bysaturating the use of your CPU cores.

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import dash
import dash_html_components as html
import dash_core_components as dcc
import plotly.graph_objs as go
import numpy as np

# number of seconds between re-calculating the data                                                                                                                           
UPDADE_INTERVAL = 5

def get_new_data():
    """Updates the global variable 'data' with new data"""
    global data
    data = np.random.normal(size=1000)


def get_new_data_every(period=UPDADE_INTERVAL):
    """Update the data every 'period' seconds"""
    while True:
        get_new_data()
        print("data updated")
        time.sleep(period)


def make_layout():
    chart_title = "data updates server-side every {} seconds".format(UPDADE_INTERVAL)
    return html.Div(
        dcc.Graph(
            id='chart',
            figure={
                'data': [go.Histogram(x=data)],
                'layout': {'title': chart_title}
            }
        )
    )

app = dash.Dash(__name__)

# get initial data                                                                                                                                                            
get_new_data()

# we need to set layout to be a function so that for each new page load                                                                                                       
# the layout is re-created with the current data, otherwise they will see                                                                                                     
# data that was generated when the Dash app was first initialised                                                                                                             
app.layout = make_layout

# Run the function in another thread
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(get_new_data_every)


if __name__ == '__main__':
    app.run_server(debug=True)

You could also try swapping out ThreadPoolExecutor for ProcessPoolExecutor, which will mean that the function will be run in another process rather than a thread, giving you true parallelism, however I believe this means that in order for the results of the other thread to be communicated back to the main process, they must be pickleable, which your data may or may not be.

I’m only just starting to wrap my head around concurrency in Python, so hopefully all this is approximately accurate. Someone else chime in if I’ve gotten anything wrong!

3 Likes

Another option would be to run two processes, one that runs the job and the other that runs the dash web server. You can share data between these two processes by just writing the output result to the file system in the scheduler task and reading the data in the Dash processes.

This way, if you run your Dash app with multiple processes (using something like $ gunicorn server.app --workers 4), each process will just read from the same file system and only a single process will be running your scheduled jobs.

For scheduling processes, APScheduler has been recommended to me: https://apscheduler.readthedocs.io/en/latest/.

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

@sched.scheduled_job('interval', minutes=3)
def timed_job():
    print('This job is run every three minutes.')

@sched.scheduled_job('cron', day_of_week='mon-fri', hour=17)
def scheduled_job():
    print('This job is run every weekday at 5pm.')

sched.start()
7 Likes

Thanks very much for your time and effort @nedned. making the layout a function seems very sensible. You are exactly right about this being a concurrency problem and your solution works nicely. The data is picklable and I modified your function to write to a file rather than using a global variable np.save('data1',data) and then read it back in the make_layout function data = np.load('data1.npy'). The concurrent.futures modules seems perfect, however I tried using a separate process instead of a separate thread replacing executor = ThreadPoolExecutor(max_workers=1) with executor = ProcessPoolExecutor(max_workers=1) or executor = ProcessPoolExecutor() but it seems like it never actually reruns the get_new_data_every() function. Am I missing a trick there?

I also like the idea of the scheduler @chriddyp. I will try and incorporate that as well

I thought I tested that demo app with ProcessPoolExecutor, but now that I think about it, that’s not going to work. It should be running the get_new_data_every function, however it’ll be doing it in another forked process with any changes being stored in its own allocated memory, with the global being modified in the forked process but not the original process.

When I run the multiprocess version, it’s definitely executing the function though, so if you’re saving the data to disk, the multiprocess approach should work. I don’t know why it’s not running for you. In such cases I always reach for liberal uses of print(“foo”) etc calls all over the place to work out what is or isn’t getting executed.

The original solution you posted @nedned is probably good for my purposes. My real ‘computationally expensive function’ is not CPU or I/O resource intensive really. It just spends some time (20 to 60 s worth) getting data from slow web sources and I don’t want to put that time onto a user app start up time. Using multiple threads is fine and if I use the write-to-file option I don’t have any lingering concerns over using global variables in Dash.

Using a multiple processes would be nice as it would allow me to use the scheduling solution proposed @chriddyp which seems tidy to me. I have got the multiple processes working (sort of) using the code below . I had to guard the statements with if __name__ == '__main__' which apparently is required. When I use the multiple processes option I don’t see the output of any print statements in the functions called by that process (e.g. get_new_data_every) though I can see that it is producing output. The strange thing is that if I repeatedly refresh the web page (app) in my browser the plotted data is updated every 5 seconds, but sometimes it gets updated twice in quick succession at every 5 seconds. I have no idea why. As I say your original solution is a good one but I am curious whether this is a coding problem or something with my python setup. If something is screamingly obviously wrong with my code below it would be good to know, but otherwise you’ve given me heaps of help already and I’m happy to accept your solution.

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import dash
import dash_html_components as html
import dash_core_components as dcc
import plotly.graph_objs as go
import numpy as np

# number of seconds between re-calculating the data                                                                                                                           
UPDADE_INTERVAL = 5

def get_new_data():
    print('get_new_data')
    """Updates the global variable 'data' with new data"""
    #global data
    data = np.random.normal(size=1000)
    np.save('data1',data)


def get_new_data_every(period=UPDADE_INTERVAL):
    print('get_new_data_every')
    """Update the data every 'period' seconds"""
    while True:
        get_new_data()
        print("data updated")
        time.sleep(period)


def make_layout():
    data = np.load('data1.npy')
    chart_title = "data updates server-side every {} seconds".format(UPDADE_INTERVAL)
    return html.Div(
        dcc.Graph(
            id='chart',
            figure={
                'data': [go.Histogram(x=data)],
                'layout': {'title': chart_title}
            }
        )
    )

app = dash.Dash(__name__)

# get initial data                                                                                                                                                            
get_new_data()

# we need to set layout to be a function so that for each new page load                                                                                                       
# the layout is re-created with the current data, otherwise they will see                                                                                                     
# data that was generated when the Dash app was first initialised                                                                                                             
app.layout = make_layout

def start_multi():
    executor = ProcessPoolExecutor(max_workers=1)
    executor.submit(get_new_data_every)

if __name__ == '__main__':

    start_multi()
    app.run_server(debug=True)

When I run your code I do see the output from the print statements, so maybe there’s some kind of system-specific quirk going on with how standard out is handled in your terminal with forked processes. (I’m running Ubuntu)

Slightly erratic timings is possibly normal when dealing with a worker process that’s sleeping. But I don’t think it’s a reason to be concerned.

If your computationally expensive function is doing web requests, then threading makes plenty of sense rather than multiprocesses, as your other thread will release the global interpreter lock while waiting for the web requests to come back.

But if you still wanted to take advantage of multiple worker processes for running the whole Dash app, another way you could tackle this is to still the ThreadPoolExecutor but run gunicorn with multiple worker processes. However this means that each worker process will periodically run the function, which is redundant, so you could modify the logic in get_new_data_every to not just run every x seconds, but only run when it has been X seconds since any worker process ran the function. You could do this by saving to a separate file the timestamp of when it last began execution of the function.

I think though in general, @chriddyp’s suggestion to use another entirely separate script which is scheduling the running of the function is probably a cleaner way of handling this problem. I reckon I’ll have to look into the APS scheduler; it looks pretty handy.

Great suggestion re having workers occasionally taking on the role of updating the data. I will look into it when i get a chance. I’ve been playing with the different options @nedned and your original suggestion (using ThreadPoolExecutor) is working very well. Its a little erratic once deployed on Heroku but that’s another subject and it works like a charm when run locally. Happy to call this solved. Thanks again for all your help and yours @chriddyp.

2 Likes

While server-side rendering, application is rendered including updated parent’s state by resolved data from the child component if you want to more then contact me on lenovo support

Hi @nedned @chriddyp , can you please give an example where the apscheduler is used to update a dataframe and the same dataframe is being used in the dash application code. It would really be helpful.

And one more doubt, while using threaded approach there seems to be a problem with @app.callbacks in the application, may i know how to resolve them.

Thanks,
Deepak

3 Likes

Hi @chriddyp @nedned, can you please give a example for this?

I’m having problems with this and have similar problems. Is there no way to do this? This means users will have to wait a significant time for it to load, as my data volume is not small.

Thanks,

See https://github.com/plotly/dash-redis-celery-periodic-updates/

1 Like

I’m trying to do something pretty similar, that is to say update a dataframe on the server side to avoid the client to load it each time. I’ve mostly been inspired by this thread’s content.

This is my main_page.py file (multi-page web app)


RELOAD_INTERVAL = 2 * 3600 # reload interval in seconds


def refresh_data_every():
    while True:
        refresh_data()
        time.sleep(RELOAD_INTERVAL)


def refresh_data():
    global df
    ### some expensive computation function to update dataframe
    df = update_df()

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(refresh_data_every)

I thought that would have been enough. Unfortunately, i had to add a little trick : add an invisible div in the layout and add a callback with this div as input and my graph / figure as ouput

@app.callback(
   Output('my-graph', 'figure'), [Input("invisible-div-callback-trigger", "children")]
)

def update_graph_with_correct_df(_):
   ### the input value really doesn't matter, hence the "_"
   global df
   ### get the correct data to return to the graph with some function
   graph_data = get_graph_data(df)
   return graph_data

1 Like

Can’t re-edit my last post but just some update about what I posted:

  • No need to add this callback for a dataframe. I had to use such a logic though witha Datepicker object when updating my global variable “today” ;

  • I’m having trouble figuring why but with this auto-refresh logic, my refresh_data() function is called twice for every reload.

super helpful thank you!

Hi,

Thanks for providing the solution. I have tried to use Blockscheduler to clear the cache and rerun the long running functions and ran into worker error ‘[CRITICAL] WORKER TIMEOUT’. Could you help provide some insides on what may cause this?

Thanks

Thanks for this solution.

I used a very similar solution to this one, however, I used

from apscheduler.schedulers.background import BackgroundScheduler

then:

if __name__ == '__main__':
    
    # Multi-threaded scheduler handles updating all data for UI once a day at 
    # 2am. All DBO databases will be updated with new data from Oracle
    # Enterprise Manager, and fresh parquet files will be generated.
    sched = BackgroundScheduler()
    
    @sched.scheduled_job('cron', day_of_week='mon-sun', hour=2, minute=15)
    def scheduled_job():
        print("Updating all UI data for: {}".format(date.today().strftime("%m-%d-%Y")))
        print('This job is run every day at 2:15am.')
        update_all_UI_data()

    # Start the scheduler
    sched.start()

    # Start the app.
    app.run_server(debug=False, host='0.0.0.0', port=8088)

where update_all_UI_data() does exactly as it says. ha! It queries a few servers, does some transformation on the data, and saves all of the resulting dataframes to disk in parquet format.

BTW if you have huge data, saving to a parquet file format is amazing. I experienced over 50% decrease in memory usage.

That’s my two cents! hope it helps someone.

I’ve been trying to figure it out how to update a dataframe for a week, thanks for the comment, was very useful!