Dynamic Use of Global Data

I’ve searched far and wide for a solution to this and have found a few close answers, but have not been able to apply them to this situation successfully.

I saw a presentation of Dash that allows users to select what axes they want to supply to a plotly_express graph at a meetup (https://www.youtube.com/watch?v=5Cw4JumJTwo) and wanted to take that idea a step further. Instead of predefining the data set being explored with this tool, what if I allowed the user to select what data to explore via SQL? (yes they can load in a csv file, but for this use case I need a direct SQL pull into Dash)

How can I allow users to pull in a data set from a database and have the dropdown options dynamically change to be populated with the new column names of the new data set pulled? As far as I know there are 2 issues with the code below:

  1. The dcc.Dropdown I’m using now is incorrectly being supplied the col_options from the data pull and it’s not clear why.
    dash.exceptions.InvalidCallbackReturnValue: Invalid number of output values for …x.options…y.options…color.options… Expected 3 got 47
  2. I believe the dataset is needed globally in order to know what data to supply the dcc.Graph and the plotly_epress call. Do I make the SQL pull data globally accessible? I know this is bad in a multi-user context as shown in https://dash.plot.ly/sharing-data-between-callbacks under " Why global variables will break your app".

What are the alternatives?

import dash
import dash_html_components as html
import dash_core_components as dcc
from dash.dependencies import Input, Output
import plotly_express as px
import pandas as pd
import pyodbc

# Try to see if data is already globally defined. Once db query is run I'm hoping `data` will be replaced
try:
  print(data.columns)
# It won't be defined on first run, so we'll load the data of the last query ran
except:
  print('Pull new data')

# List of user inputs that col_options will populate
dimensions = ['x', 'y', 'color']

app = dash.Dash(__name__)
app.layout = html.Div([
  html.Div(
    # Supply any query using any tables in the db
    [dcc.Input(
      id='query',
      placeholder='SQL Query',
      value="""SELECT TOP 100 * FROM base.Visit"""
      )] +
    # Dropdowns with dynamic column options generated by most recent data pull
    [html.P([d + ":", dcc.Dropdown(id=d, options=[])]) for d in dimensions]
  ),
  dcc.Graph(id='main_graph')
  ])

@app.callback([Output(d, 'options') for d in dimensions],
              [Input('query', 'value')])
def refresh_db_data(query):
  connection = pyodbc.connect('Driver={ODBC Driver for SQL Server};'
                              'Server=my-sqlserver;'
                              'Database=my-db;'
                              'uid=username;' # Dont actually do this! Just for demonstration
                              'pwd=password;' # Dont actually do this! Just for demonstration
                              'Authentication=ActiveDirectoryPassword'
                              )
  # Pull data into a pandas DataFrame using user inputs (db credentials and query) and save
  db_data = pd.read_sql(query, connection)
  # Close connection
  connection.close()
  # Make the data accessable
  global data
  data = db_data
  # Dynamically define column options for `dcc.Dropdown` from most recent data pulled
  col_options = [dict(label=x, value=x) for x in db_data.columns]
  # Send back new col_options to `Dropdown` options
  return col_options

@app.callback(Output('main_graph', 'figure'), 
             [Input(d, 'value') for d in dimensions]) 
def update_plot(x, y, color): 
  return px.scatter(data, x=x, y=y, color=color) 

app.run_server()

Edit: Sorry for all the edits, it’s my first post and I’m continuing to try solutions to the problem above…learning as I go

You’re right, this will not work with multiple users or with multiple users. :+1:

Maybe follow some flowchart like this:

  • If the data is updated frequently:
    • if the data is not very large:
      • —> just pull the data every time from SQL
    • if the data is large:
      • this is the hardest one, frequently updated large data: you can try using brain-plasma like mentioned below, or saving to disk, on some interval and then pulling values from disk each time instead of from SQL. But it’ll just be slow unless only a small part of the data are update. Then pull only updated parts of the data and append those pieces to something on disk or in brain-plasma
  • if the data is not updated frequently:

Unless the data is very large, and especially if the server hosts the app and the database, pulling from SQL should be fine for every call. Just write a function and call it each time you need data, and cache the values (just for that user session).

2 Likes

Thanks for your reply. To be more clear, the problem is not yet related to data size, though your answer does help answer how I can optimize this once I get it working. The issue is more related to being able to input ANY SQL query and have that data globally available.

It sounds like the only way to use this data is through callbacks or caching which answers #2 of my two issues listed above.

To answer issue #1 it looks like I had to return 3 instances of col_options for this to work since I have 3 outputs under refresh_db_data(). I thought that single col_options object would be shared 3 times instead of having to share a list with 3 col_options (e.g. [col_options, col_options, col_options]). Is this always the case with multiple outputs?

Sorry I didn’t address that one - you’re right, you have to return equal the number of objects for the number of outputs. The global variables never work, definitely try caching. Redis is great.

(also I edited my answer, i messed up on one lol)

Sorry to just jump in here, but in the scenario: Data updated frequently > Data is large, you mention that

or saving to disk, on some interval and then pulling values from disk each time instead of from SQL. But it’ll just be slow unless only a small part of the data are update. Then pull only updated parts of the data and append those pieces to something on disk or in brain-plasma

I have this scenario. A “big” data file (for example 18 million rows in an sqlite database), which gets updated with a couple of rows every minute. Clearly I don’t want to read the complete database each time, so the best would be to just pull the new rows from the database (or from a separate file), and append those to my already existing dataframe. But I find it hard to figure out a clear concise way to do this that doesn’t involve using a global dataframe or breaking functionality. Do you have any pointers that could help me in this regard? Thanks!

Oh damn, 18M in SQLite! I would look into using Apache Ray or Plasma for this like in if the data is not update frequently --> if the data is large.

Just pull those new rows, pull the data from shared memory, add the rows, and send it back to the shared memory.

Alternatively, you could save each new row as a new object in shared memory and just concatenate them all together (which is cheap) every time you need the whole dataset. Increases complexity but is faster and cheaper.

Thanks a lot for replying :slight_smile: Was away so didn’t get a chance to look at it before now. Ok, so something like:

# First time, reads all data from database
df = pd.read_sql(query, conn)

[...]  # App layout etc.

@app.callback(Output("hidden-div", "children"),
              [Input("interval-component", "n_intervals")])
def update_df(n):
    [...]  # query to pull latest rows, etc.
    global df
    df_new = pd.read_sql(query_new, conn)
    df = pd.concat(df, df_new)
    return "Nothing"

I know that global dataframes should be avoided, but in theory all I’m doing here is appending new data to the existing data, which in turn is used for the graphs of the app. Do I understand you correctly, that this is ok to do? Also, I haven’t used hidden divs, but I saw in the documentation that this is a trick for callbacks with no output, but I assume the return syntax I have used isn’t correct.

The global won’t work - the app’s threads don’t share memory, so they will only have the data appended to the instance of your global df that they have changed. E.g. thread 3 has updated df four times with 25 rows each, so it has 18M+100 rows, whereas thread 1 has updated df 12 times, it has 18M+300 rows with no access to the 100 rows that thread 3 has used. So a global df won’t work for you in this case.

You need to store either the entire dataset (18M + new rows) in some shared-memory place, or you need to store the new rows in a place that can be accessed by all threads - a hidden div (as a dictionary), on disk, or in some shared-memory place like Redis, Apache Plasma, etc.

2 Likes

Hi again @russellthehippo!

Thanks a lot for your help, it’s greatly appreciated. I’ve spent some time trying to understand these things and caching in general etc. It definitely makes sense that it works the way you described.

So I think I’m closer to a solution now, but I would like to confirm with you that my approach is correct and if you could help illuminate another hurdle I’m struggling with.

import pandas as pd
from flask_caching import Cache
import pyarrow as pa
import time

app = dash.Dash(__name__)
cache = Cache(
    app.server,
    config={
        "CACHE_TYPE": "redis",
        "CACHE_TYPE": "filesystem",
        "CACHE_DIR": "cache-directory",
        "CACHE_THRESHOLD": 10,
    },
)

start_project_time = 1541179991
start_app_time = int(time.time())

# [...]

@cache.memoize()
def clean_data(start_ts, end_ts):
    # create sql query etc. `df_query` function
    sql_query = df_query(start_ts, end_ts)
    # Read from database (con = sqlite3-connection to db)
    df = pd.read_sql(sql_query, con)
    # Change columns data type etc.
    df = clean_df(df)
    return pa.Table.from_pandas(df)

# [...]

@app.callback(
    Output("mhp-graph", "figure"),
    [Input("submit-button", "n_clicks")],
    [State("id-dropdown", "value")],
)
def update_graph(n_clicks, value):
    df = clean_data(start_project_time, start_app_time)
    df = df.to_pandas()  # convert pyarrow table to pandas again
    start_ts = max(df["timestamp"])
    end_ts = int(time.time())
    df_new = clean_data(start_ts, end_ts)
    df_new = df_new.to_pandas()
    df = pd.concat(df, df_new)

    # [...]

A couple of questions:

  • The variable start_app_time, will it have the same problem as the global dataframe in this case? i.e. not all threads necessarily share this variable?
    • I guess a workaround would be to use midnight of tomorrow’s date instead, or a preset start_app_time. It is only needed for a timestamp filter in the sql query (timestamp BETWEEN start_time AND end_time)
    • Another workaround is to not filter on end-time, which is totally ok, invalid timestamps can be removed in filtering of the dataframe itself (so the sql query would be timestamp > start_time instead).
  • If I understand it correctly, the big expensive query will be queried and fast (since it will have the same input each time). The new rows will also be cached, but since I am calling it with a new timestamp each time, it will never used the cached results?

Further, some points I wanted some clarification on:

Just pull those new rows, pull the data from shared memory, add the rows, and send it back to the shared memory.

Alternatively, you could save each new row as a new object in shared memory and just concatenate them all together (which is cheap) every time you need the whole dataset. Increases complexity but is faster and cheaper.

You need to store either the entire dataset (18M + new rows) in some shared-memory place,

or you need to store the new rows in a place that can be accessed by all threads - a hidden div (as a dictionary), on disk, or in some shared-memory place like Redis, Apache Plasma, etc.

  1. I don’t see how I can send it back to shared memory?
    • Let’s say I do like I have done now, and I get the big 18m query (fast, since it is cached). I then get all new rows since the last timestamp in the 18m dataframe. I concat them together, and then what? How do I overwrite the result of the expensive cache with the new df so that I next time get 18m + new rows df (instead of just 18m df).
  2. I don’t see how I could pull this off? Sounds interesting though!
  3. This I think is great, and then I can pull it and append new rows, and send back.
    • Basically 1 again, but it is unclear for me how to only compute new-new rows (all rows after 18m + new rows), and not all rows since the initial store (all rows after 18m), and to then cache it again.
  4. I think this is kind of what I am doing now, just that since I always run the clean_data function with new timestamps, I don’t really use the cached results, except for the expensive query.

Thanks a lot for your help, it is incredibly helpful for me!

I am kind of lost in the intricacies of this discussion, but an easy way I found of dealing with large data in Dash is a simple (I think) two-step process.

  1. Callback A is made to initiate a python script (ideally using the subprocess module), which creates a file on the hard disk which it then constantly updates according to some rule/process. Callback A also has instructions on how to terminate the process to allow ease of use.
  2. Callback B is constantly triggered by a dcc.Interval object to check the created file on the hard disk for any updates at some interval. Depending on how efficient this checking is, the app will work quickly.

How does what has been mentioned so far compare with my scheme above? I am extremely interested in figuring out how to make these processes more efficient.

1 Like

@Mike3 yours is a good solution as well, kind of a mix of the three options I described. Look into using Apache Plasma! You can store things in a shared memory space and recall them at any time using ObjectIds - they stay in memory the whole time so it’s extremely fast. I made brain-plasma to make using Plasma easier, with names and such.

@coffee I should have been more clear - caching is not what you want in this situation because your data is changing. You’re right: you’ll just have to re-cache each thing each time, so caching wouldn’t actually do anything for you. Caching depends on function inputs to output the same output. You need to detach the data process from the callback function inputs. It would work if caching is based on some non-callback function like get_old_data that returns just the one big dataset each time.

@some_caching_method
def get_old_data():
    # get the initial 18M rows of data like you would normally
    # after the first time, this data will be cached and will come back quickly. However, Redis is not good for this - it is made for small objects
    return data

def get_new_rows(start_ts):
    # get the rows that are not included in the initial 18M rows
    return new_rows

# inside a callback:
@app.callback(...)
def clean_data(start_ts,end_ts):
    df = get_old_data() # this is cached - it will be fast
    new_rows = new_rows()
    df = pd.concat([df,new_rows])
    # do other stuff
    return ...
    

Also, putting the data in Arrow format like Table doesn’t automatically share it in memory - you need to send it to a shared memory space like Plasma or Ray.

To answer your questions:

  1. You need to use some kind of shared memory space for this - Apache Plasma, Apache Ray, etc. The documentation for those is good.
  2. See my above code example - you would save each group of new rows to disk and store the file location somewhere. Then each time you need the entire dataset, pd.concat the initial 18M rows with all the new data you’ve stored since then. This is much faster than pulling them all again.
  3. Use the last index (assuming it’s a unique number) in the data you currently have. Then pull any data that has an index greater than that in SQLite.
  4. See code example

The alternative to all this is to just store the entire dataset in shared memory (Plasma, etc.). Use it like:

  • the first time you use the data, put the entire thing into shared memory
  • each time you need it again:
    • pull the new rows
    • pull the old data from shared memory
    • concatenate them
    • delete the old data from shared memory
    • store the new data into shared memory

If you are using Plasma, you would just need to store the ObjectId of the data somewhere to access it again. Shared memory makes all this very fast.

If you’d like help with other questions, feel free to message me directly! :smiley:

2 Likes

@russellthehippo No worries, it’s more likely that I’m on unknown waters here, which is why I’m not fully understanding you and making mistakers. It makes sense with the get_old_data() caching, but I guess that automatically means that I can’t store 18m + new rows in a beneficial way (by just using caching I mean). It was very fast to just cache the old data though, despite using redis and pa.Table.from_pandas.

Yes, I realized after posting my previous post that I didn’t do it correctly. I found your absolutely excellent thread here on apacha plasma sharing data. Things started to click for me and I tried it out, and I think I got it more or less working now.

  1. So in the get_new_rows there is a plasma_read function that returns the new rows, which are already stored in the apache plasma store? I.e. It gets the already stored new rows, finds new rows in addition to these, deletes the old new rows from strore, writes the new-new ros to the store, and then returns the data so that I can concat it in the app.callback.
    • This solution makes sense for when I have big dataset which I then start an app on, while the alternative solution makes sense if the app is running from when the data is empty. This is interesting.
    • I think making like a dual solution will be beneficial, i.e. running the alternative method from start until end of project / close to end, when no more data comes in (like now). Then I can switch over to simply always using a cached store of the data etc.

I implemented the alternative!

  • I started a Plasma store in a separate process, but there is another issue I stumbled upon: How do I only run the initial big query once?
    • I ended up creating a separate script.
    • I run it to store the big data in the plasma store, the object_id in the pickle file, and then it terminates. I then start the app and read from the plasma store. This is to avoid the app running it multiple times. Is there a better way?

I then do what you say, I pull all new rows (cannot test it at the moment, gateway onsite is offline, so it pulls an empty dataframe), pull the old data, concat, and store it in shared memory. However, I currently don’t delete the old one (which obviously is a problem), but it seems from the plasma documentation that it is as easy as client.delete(object_id).

Again, thanks a lot for the help! And thanks for offering to help via messages. I’ll be sure to message you if there is anything else that comes up =)

1 Like

Very interesting things you are saying, russellthehippo; thanks for the info, I’ll think about all the options you have laid out!

Having looked over brain-plasma on your recommendation, as well as the apache arrow project’s plasma itself, I have one main concern I would like you to alleviate. All of plasma’s docs make it seems like the objects that are sent into plasma are immutable, and generally cannot be changed or edited once they are there.

Obviously this means most streaming applications cannot work with plasma to carry whatever payload that they’re growing over time. How did you get around this limitation of base plasma, if I am right about that original limitation in the first place?

You’re right, immutability was a key challenge to fix. For changing the value of an object, I got around it by doing:

  1. Get the new value as new_value
  2. Save the object’s ObjectID as some variable, e.g. this_id
  3. Delete the value referenced by this_id with client.delete(this_id)
  4. Set the value referenced by this_id to new_value.

In brain-plasma I do this in the Brain.learn() function by forget-ing the stored value before reassigning it.

Does this correspond to just deleting the original object stored by plasma whenever a change is made? Is this efficient for small changes in a big dataframe, let’s say ~10mil. rows? And I assume there is no appending feature being used here?

Please pardon all these questions, I just want to conceptually understand how such a big advance is being made ^^, .

Yeah it’s definitely an efficiency issue if you are constantly changing the thing. However,

  • it’s fast enough that it hasn’t been a problem for me, especially compared to reading from and writing to disk;
  • I haven’t found a way to just change part of a value in Plasma - it’s all or nothing afaik (do you have any ideas for this? if so that would be awesome)
  • to track changes like that would be a lot of overhead and complexity.

If you want maximum efficiency, store the pieces of the dataframe individually and concatenate them/combine changes upon pulling from Plasma. Again, that’s more overhead, but it’s certainly faster than storing a whole new dataframe over and over again. But overall I consider it a win to have a significant upgrade over pulling from disk or pulling an entire SQL table.

All good points; thank you for replying.

I will at least look into what further possibilities there might be with Plasma in the appending realm (who knows, as it is still being actively developed?), and respond if I ever find something/have further questions.

1 Like

I think i have just run into the trap of the global variable and was hoping for some advice from this thread on how I may go about resolving.

In summary my data is jumping around and appearing to part update… Some charts/table has the updated data whilst other do not.

The main reason for the design pattern which follows was user experience - making a round trip in each callback to REDIS was slow due to the size (it can range between 15Mb to 82Mb)

I’m running a multi-page dash application with 2 workers (Procfile web: gunicorn --workers 2 server:server --timeout 120)

I have a data.py module which has a function get_df_data() which I place in all my callbacks

data.py - extract

    import json
    import redis
    from tasks import update_data
    import os 

    redis_instance = redis.StrictRedis.from_url(os.environ["REDIS_URL"])

    app_data=None

    def get_dataframe():
        global app_data
        """Retrieve the dataframe from Redis
        This dataframe is periodically updated through the redis task
        """
        jsonified_df = redis_instance.hget(
            tasks.REDIS_HASH_NAME, tasks.REDIS_KEYS["DATASET"]
        ).decode("utf-8")
        df_redis = pd.DataFrame(json.loads(jsonified_df))    
        df_redis['value_date'] = pd.to_datetime(df_redis['value_date'])
        df_redis = df_redis.sort_values(by='value_date')
        df_redis['month_index'] = pd.Categorical(df_redis['value_date']).codes
        app_data = df_redis 
        return app_data


    def get_df_data():
        df = app_data
        return df

callback usage

    @app.callback(Output('data-table', 'data')
                [Input('hidden-data-radio', "value")])
    def return_performance_table(hidden):
        data = get_df_data()
        data = data.to_dict("rows")
        return data

In my application I have a button that executes a celery task to update redis - this is working perfectly fine.

refresh button

    def refresh_global_data():
        update_data()
        get_dataframe()
        return print("data refreshed")

celery task - working fine doing its job of populating redis

        import datetime
        import json
        import time
        import os
        import plotly
        import redis

        from celery import Celery

        from frontier.data.mssql_dashboard_data import MsSqlAnalyticsData

        celery_app = Celery("Celery App", broker=os.environ["REDIS_URL"])
        redis_instance = redis.StrictRedis.from_url(os.environ["REDIS_URL"])

        REDIS_HASH_NAME = os.environ.get("DASH_APP_NAME", "app-data")
        REDIS_KEYS = {"DATASET": "DATASET", "DATE_UPDATED": "DATE_UPDATED"}

        @celery_app.task
        def update_data():
            print("----> updating data")
            # Create a dataframe with data
            db_celery = MsSqlAnalyticsData()    
            df_celery = db_celery.get_champ_performance_combined()

            # Save the dataframe in redis so that the Dash app, running on a separate
            # process, can read it
            redis_instance.hset(
                REDIS_HASH_NAME,
                REDIS_KEYS["DATASET"],
                json.dumps(
                    df_celery.to_dict(),
                    # This JSON Encoder will handle things like numpy arrays
                    # and datetimes
                    cls=plotly.utils.PlotlyJSONEncoder,
                ),
            )
            # Save the timestamp that the dataframe was updated
            redis_instance.hset(
                REDIS_HASH_NAME, REDIS_KEYS["DATE_UPDATED"], str(datetime.datetime.now())
            )

Is this design pattern the reason for partially update data?

Is there any obvious alternatives methods?

Any advice welcomed please