Preventing Wasteful Parallel Callbacks When Running Multiple Processes and Caching

In my app multiple callbacks depend on a shared result. Calculating this shared result once is slow enough, but having multiple processes (or threads) repeatedly doing the calculation is even slower.

I think I am looking for a cache-like solution that when the cache returns a miss, the requesting callbacks will simply wait until the first callback is finished and then grab the same result. I have tried using Flask-Caching but in my tests it seems like if there is a cache miss then all workers/threads will still execute the memoized function.

At the end of this post. I have some code to demonstrate the problem. Notice that ā€œCalculatingā€ gets printed at least twice to the console (and perhaps more times if you are using more than one worker).

I have a solution in place using Pythonā€™s thread synchronisation tools (locks etc) however this only works for a single worker/process. What can I do to fix this when using multiple workers/processes?

import dash
import dash_html_components as html
from flask_caching import Cache
import time

app = dash.Dash(__name__)

server = app.server

cache = Cache(app.server, config={
    'CACHE_TYPE': 'filesystem',
    'CACHE_DIR': 'cache-directory'
})

app.layout = html.Div(children=[
    html.Button('Submit', id='button'),
    html.Div(id='output-container-button1', children=[]),
    html.Div(id='output-container-button2', children=[]),
])

# This should only ever be called once!
@cache.memoize()
def slow_function(argument_1):
    print("Calculating")
    time.sleep(3)
    return 1

@app.callback(
    dash.dependencies.Output('output-container-button1', 'children'),
    [dash.dependencies.Input('button', 'n_clicks')])
def update_output1(n_clicks):
    value = slow_function(3)
    return 'The button has been clicked {} times'.format(value)

@app.callback(
    dash.dependencies.Output('output-container-button2', 'children'),
    [dash.dependencies.Input('button', 'n_clicks')])
def update_output2(n_clicks):
    value = slow_function(3)
    return 'The button has been clicked {} times'.format(value)

if __name__ == '__main__':
    app.run_server(debug=True)
1 Like

I was able to solve this using a combination of:

  • multiprocessing.lock() (instead of threading.lock())
  • gunicorn preloading

If you use the preload option with gunicorn then you can share objects between processes using the multiprocessing family of proxy objects.

1 Like

Iā€™ve spun my code off into a module, which is available via pip https://github.com/sjtrny/jitcache

I hope that others get some use out of this. I have an example using Dash here https://jitcache.readthedocs.io/en/latest/dash.html, which I have copied below:

REDACTED: CODE OUTDATED. Refer to the following post for updated code https://community.plotly.com/t/preventing-wasteful-parallel-callbacks/18956/5?u=sjtrny
1 Like

Nice! Thanks for sharing!

Since the other day I have changed the design of jitcache to be more in line with LRU Cache and Flask-Caching by using a decorator instead.

from jitcache import Cache

cache = Cache()

@cache.memoize
def slow_fn(input_1, input_2, input_3=10):
    return input_1 * input_2 * input_3

print(slow_fn(10, 2))

For plot.ly you can either decorate entire callbacks (just like in Dashā€™s Performance Docs) or you can decorate a subroutine. Below I demonstrate how to decorate callbacks (you can find more documentation here):

import dash
import dash_html_components as html
from jitcache import Cache
import dash_core_components as dcc

cache = Cache()

app = dash.Dash(__name__)

server = app.server
app.layout = html.Div(
    children=[
        html.Div(id="output-container-dropdown1", children=[]),
        html.Div(id="output-container-dropdown2", children=[]),
        dcc.Dropdown(
            options=[
                {"label": "New York City", "value": "NYC"},
                {"label": "MontrƩal", "value": "MTL"},
                {"label": "San Francisco", "value": "SF"},
            ],
            value="MTL",
            id="dropdown",
        ),
    ]
)

@app.callback(
    dash.dependencies.Output("output-container-dropdown1", "children"),
    [dash.dependencies.Input("dropdown", "value")],
)
@cache.memoize
def update_output1(input_dropdown):
    print("run1")

    return input_dropdown

@app.callback(
    dash.dependencies.Output("output-container-dropdown2", "children"),
    [dash.dependencies.Input("dropdown", "value")],
)
@cache.memoize
def update_output2(input_dropdown):
    print("run2")

    return input_dropdown

if __name__ == "__main__":
    app.run_server(debug=True)
2 Likes

This looks really great. Once we investigate this a little bit more, weā€™ll consider adding it to the docs. Thanks for sharing this and keep the thread updated with your progress!

I like jitcache, Iā€™ve tried it and it works well. Cool build.

If you want an option that does this explicitly and feels like using a dictionary, I made https://github.com/russellromney/brain-plasma for this very purpose. Also available through pip.

from brain_plasma import Brain

brain = Brain()

@app.callback(...)
def create_slow_df_only_once(...):
    df = # large, slow, data
    brain['slow_df'] = df # saves to a Plasma in-memory object store
    ...

@app.callback(...)
def access_large_df(...):
    df = brain['slow_df']
    ...
    
2 Likes

@sjtrny - Iā€™m curious on how your @cache.memoize function works with .lock and concurrent requests.
Letā€™s imagine that a callback def update takes 5 seconds to run and two requests are made, the second request 2 seconds after the first one.

What happens? Does the memoize decorator put some kind of lock on the update function and does the second request ā€œwaitā€ for the first request to finish and then access the value from memory? Therefore the second request gets the value after 3 seconds instead of recomputing the same result in 5 seconds?

Yes thatā€™s correct - the second request will only take 3 seconds.

1 Like

Very nice. @russellthehippo - Does your plasma integration work the same way or does the end user need to program in the ā€œwaitsā€?

@chriddyp sadly brain-plasma has no locking control for the brain for the same objectā€¦which I suppose means brain-plasma doesnā€™t solve the key problem of doing the same thing multiple times in separate callbacks.

Some behavior notes for brain-plasma:

  • If the same client is trying to access an object that is being created, the request will fail if the object create transaction is not done (because the object doesnā€™t officially exist yet).
  • A request to any other object works fine, either with the same client or a different client
  • A request by a separate client will just return an object not found error until the object is created
  • If the object already exists, it is immutable. This means that, while an object is being changed, the previous value will be returned until the old value is deleted (ending the copy-replace-delete update transaction).

I like this implementation https://github.com/sjtrny/jitcache/blob/master/jitcache.py#L32
I may implement something similar for same-client get/put calls in brain-plasma. Thereā€™s no way to protect against it between clients though.

1 Like

Hi Stephen, jitcache seems to solve a pretty big Dash issue that many of us deal with. My question is how does jitcache handle timeouts or does it reach a maximum quantity of cache keys? I donā€™t see a default timeout or max key configuration and Iā€™m worried Iā€™d run up an infinite cache size in production.

At the moment it doesnā€™t. Happy to take contributions/pull requests to add that feature though.

Potentially the code could be changed to just become a thin wrapper of an existing cache that support such features.

1 Like

I may try to take a look at it in the coming days/weeks. If I can make something work Iā€™ll submit a pull. Iā€™m running multiple dash apps from a larger flask application and everything was harmonious until increasing the gunicorn worker count. I had to move user sessions to a redis backend to preserve state/auth status and Iā€™m now seeing problems with flask-caching filesystem cache as well. I believe your locking pattern would allow the filesystem cache to function properly with multiple gunicorn workers but itā€™s a hunch at this time. Iā€™ll experiment with jtcache on the dev version of my site to see if it solves the multiple workers issue and report back.

Is there a way to prevent the following error when running the examples from the jitcache docs? Iā€™m getting the same error on Windows and MacOS and Google seems to be lite on this particular issue. Iā€™m taking the dash example code directly from the docs with a copy/paste to get this error.

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

Looks like the issue is the default start method on MacOS changed from fork to spawn in Python 3.8

https://docs.python.org/3/whatsnew/3.8.html#multiprocessing

Setting the start method back to fork is a quick work around e.g.

from jitcache import Cache
import time
import multiprocessing as mp

mp.set_start_method('fork')

cache = Cache()

@cache.memoize
def slow_fn(input_1, input_2, input_3=10):
    print("Slow Function Called")
    time.sleep(1)
    return input_1 * input_2 * input_3

print(slow_fn(10, 2))
2 Likes

Hi All,

I was struggling with ā€˜multiple entrantsā€™ problem and found that redlock-py may help in some scenarios.

In my case, using dash Cache and Signal pattern, when running multiple workers (gunicorn), mutiple threads were entering the ā€œexpensive function that loads the cacheā€ ā€¦

I am using Redlock to enforce that only the first entrant carry on with the expensive computation, and that the others wait for the result df to be placed in cache. Obs: using flask_caching and redlock with Redis backend.

Just a sketch:


def slow_function_only_once():
   
   df = cache.get('data_key') 
   if (df is not None):
           return df # lucky
  
   my_lock = cache.set('lock_key', **expiration_millisecs**)
   #if my_lock is False, someone else already has the lock is is doing the hard work (*almost sure,* see below)
   
    while(my_lock == False):
          df = cache.get('data_key')
          if (df is not None):
               return df
          else:
               if (max_retries_exceded): 
                     my_lock = dlm.lock('lock_key', experiation_millisecs)

     #ok, if we got here... we have the lock, lets do the work
     df = expenseive computation

     cache.set('data_key', df)
     dlm.unlock('lock_key') #release the lock now that df is available for other threads
     return df

Important. It is not bullet proof. Please check out this post for details on when redlock might not be fail proofā€¦

Key points:

  1. in my scenario, the goal of the function is to compute and store a large df into cache; so if by reasons below some thread manage to snick in, it would not break the application
  2. expiration_millisecs is important to make sure the lock does not get locked forever in case a worker fails after acquiring the lock (just curious, does jitcache ensure that a failed worker wont left the lock locked?)
  3. it is key to chose expiration_millisecs wisely. It should give enough time for the thread that hold the lock to complete, place data into cache, and then release.
    3.1 if the lock expires beforehand, some other thread may start computing instead of just wait for the value (again, would not break the application necessarilyā€¦ just waste resources)

Well, it is not perfect. But good enough for some scenario, right? What do you guys think about this solution? Is there something I am missing?

Final note: Redlock uses random keys and check-and-set operations which improve robustnessā€¦ but one could even mimic this pattern using redis set-get to control the lock, etcā€¦ (worth it reading the article

@sjtrny Hi. I have the same error on windows 10/64

RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.
        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:
            if __name__ == '__main__':
                freeze_support()
                ...
        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

The suggestions above didnā€™t work for me.
Š”ould you tell me how to run your code? or perhaps there are other ways to solve the problem (if the callback is executed, do not run the second one and do not create a queue)