I’ve been following the Arrow project and have messed around a bit with Apache Plasma as a shared-memory data backend. Has anyone else used this? It provides incredible performance boosts compared to reading large data from disk, caching large objects, or other things. I was reading things in for each callback with Parquet (which is already fast) and experienced write speedups of about 5x and read speedups of 10-50x because though there is serialization, it’s all in memory.
Docs: https://arrow.apache.org/docs/python/plasma.html#
Some of my Dash projects involve
- users interact in the data in resource-intensive ways
- data sizes are massive: between 10M and 200M rows and between 4 and 165 columns; even
as_type('category')
won’t save me as about 75% of the columns are numerical or unique strings - user experience needs to be reasonably fast
- there are multiple data sources that change (e.g. I want all the marketing responses for company A, then in 1 minute I want the marketing responses for company B)
- I can only hold one or two of these large data objects in memory at once, preferably one
- *classic Dash thing* must share data between many callbacks
- some callbacks change parts of the shared data object
- I don’t have enough bandwidth to manage a lot of IT hardware/software
So, because we can’t use global variables So this means that I need a super fast, relatively simple way to read large new objects into memory and then share them between functions.
Out with the (excellent) old - Parquet
Each callback would look something like this:
...
@app.callback(...)
def data_operation():
# takes about five-twenty seconds for tens of millions of rows, slower with more columns - Parquet is FAST
df = pd.read_parquet("path/to/file.parq",columns=['col1','col2'])
#do the operations - takes five-ten seconds with optimizations and parallel processing
return the_output
Overall time: 10-45s
This was/is fine…but from a user experience perspective, it’s just not enough. Users want to see things change when they click buttons.
Enter the new - Plasma in-memory object store.
Read this for detail.
https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/
This is based off/part of Apache Arrow. It’s kind of like Redis but can be used with very large objects. For small objects I still just use JSON or Pickle files on the disk/network.
From their docs (above) on Pandas integration - basic python objects are much easier, just object_id = client.put(thing)
and client.get(object_id)
, my code is just a function for writing and a function for reading, plus a Pickle-d dictionary index on disk that keeps track of object IDs.
# Start the plasma_store with up to 15GB of memory (it open up memory as needed for new things)
$ plasma_store -m 15000000000 -s /tmp/plasma
# /functions.py
# each time the data source changes, say, from company A to company B, I write the new data to Plasma if it's not already there
def write_to_plasma(thing,which='name_of_thing'):
# get the client
client = plasma.connect('/tmp/plasma')
#save the thing to Plasma
record_batch = pa.RecordBatch.from_pandas(thing)
object_id = plasma.ObjectID(np.random.bytes(20))
mock_sink = pa.MockOutputStream()
stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
data_size = mock_sink.size()
buf = client.create(object_id, data_size)
stream = pa.FixedSizeBufferWriter(buf)
stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
client.seal(object_id)
# end the client
client.disconnect()
# write the new object ID
plasma_state = pickle.load(open('.../plasma_state.pkl','rb'))
plasma_state[which] = object_id
pickle.dump(plasma_state,open('.../plasma_state.pkl','wb'))
# reads the data each time
def read_from_plasma(which):
# get the current plasma_state
plasma_state = pickle.load(open('.../plasma_state.pkl','rb'))
# get the object ID for the thing you want
object_id = plasma_state[which]
# get the client and read it
client = plasma.connect('/tmp/plasma')
# read it
[data] = client.get_buffers([object_id])
buffer = pa.BufferReader(data)
reader = pa.RecordBatchStreamReader(buffer)
record_batch = reader.read_next_batch()
results = record_batch.to_pandas()
# close out and finish
client.disconnect()
return results
# /app.py
@app.callback(...)
def do_something(...):
# takes about 200ms - 1200 ms
df = read_from_plasma('company_A_data') # e.g.
# do the operations, no time savings here; five to ten seconds with optimizations
return the_output
Overall time: six to twelve seconds
Anyways, I hope that some of you find this info useful - it’s certainly helped me speed up my massive-data Dash apps and increase the amount of functionality I can give to users. The only limitations I see so far are that the API is not stable and use is limited to Arrow-compatible objects. But given that Pandas, Numpy, and almost all common Python objects are compatible, I’ve found it perfect for my uses.