I’ve been following the Arrow project and have messed around a bit with Apache Plasma as a shared-memory data backend. Has anyone else used this? It provides incredible performance boosts compared to reading large data from disk, caching large objects, or other things. I was reading things in for each callback with Parquet (which is already fast) and experienced write speedups of about 5x and read speedups of 10-50x because though there is serialization, it’s all in memory.
Some of my Dash projects involve
- users interact in the data in resource-intensive ways
- data sizes are massive: between 10M and 200M rows and between 4 and 165 columns; even
as_type('category')won’t save me as about 75% of the columns are numerical or unique strings
- user experience needs to be reasonably fast
- there are multiple data sources that change (e.g. I want all the marketing responses for company A, then in 1 minute I want the marketing responses for company B)
- I can only hold one or two of these large data objects in memory at once, preferably one
- *classic Dash thing* must share data between many callbacks
- some callbacks change parts of the shared data object
- I don’t have enough bandwidth to manage a lot of IT hardware/software
So, because we can’t use global variables So this means that I need a super fast, relatively simple way to read large new objects into memory and then share them between functions.
Out with the (excellent) old - Parquet
Each callback would look something like this:
... @app.callback(...) def data_operation(): # takes about five-twenty seconds for tens of millions of rows, slower with more columns - Parquet is FAST df = pd.read_parquet("path/to/file.parq",columns=['col1','col2']) #do the operations - takes five-ten seconds with optimizations and parallel processing return the_output
Overall time: 10-45s
This was/is fine…but from a user experience perspective, it’s just not enough. Users want to see things change when they click buttons.
Enter the new - Plasma in-memory object store.
Read this for detail.
This is based off/part of Apache Arrow. It’s kind of like Redis but can be used with very large objects. For small objects I still just use JSON or Pickle files on the disk/network.
From their docs (above) on Pandas integration - basic python objects are much easier, just
object_id = client.put(thing) and
client.get(object_id), my code is just a function for writing and a function for reading, plus a Pickle-d dictionary index on disk that keeps track of object IDs.
# Start the plasma_store with up to 15GB of memory (it open up memory as needed for new things) $ plasma_store -m 15000000000 -s /tmp/plasma
# /functions.py # each time the data source changes, say, from company A to company B, I write the new data to Plasma if it's not already there def write_to_plasma(thing,which='name_of_thing'): # get the client client = plasma.connect('/tmp/plasma') #save the thing to Plasma record_batch = pa.RecordBatch.from_pandas(thing) object_id = plasma.ObjectID(np.random.bytes(20)) mock_sink = pa.MockOutputStream() stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) stream_writer.write_batch(record_batch) stream_writer.close() data_size = mock_sink.size() buf = client.create(object_id, data_size) stream = pa.FixedSizeBufferWriter(buf) stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) stream_writer.write_batch(record_batch) stream_writer.close() client.seal(object_id) # end the client client.disconnect() # write the new object ID plasma_state = pickle.load(open('.../plasma_state.pkl','rb')) plasma_state[which] = object_id pickle.dump(plasma_state,open('.../plasma_state.pkl','wb')) # reads the data each time def read_from_plasma(which): # get the current plasma_state plasma_state = pickle.load(open('.../plasma_state.pkl','rb')) # get the object ID for the thing you want object_id = plasma_state[which] # get the client and read it client = plasma.connect('/tmp/plasma') # read it [data] = client.get_buffers([object_id]) buffer = pa.BufferReader(data) reader = pa.RecordBatchStreamReader(buffer) record_batch = reader.read_next_batch() results = record_batch.to_pandas() # close out and finish client.disconnect() return results
# /app.py @app.callback(...) def do_something(...): # takes about 200ms - 1200 ms df = read_from_plasma('company_A_data') # e.g. # do the operations, no time savings here; five to ten seconds with optimizations return the_output
Overall time: six to twelve seconds
Anyways, I hope that some of you find this info useful - it’s certainly helped me speed up my massive-data Dash apps and increase the amount of functionality I can give to users. The only limitations I see so far are that the API is not stable and use is limited to Arrow-compatible objects. But given that Pandas, Numpy, and almost all common Python objects are compatible, I’ve found it perfect for my uses.