MongoDB websocket and Dash plot

Greetings @snehilvj ,
I followed this tutorial to setup websockets with MongoDB ChangeStreams, and this is the output I get of this working websocket connection to my MongoDB collection FloatingEquity
https://cdn.discordapp.com/attachments/931545568931631194/983232389763448852/unknown.png .

I am now trying to build a plotly-dash app that interfaces with this MongoDB collection where data is continuously written at asynchronous time intervals. The dash app needs to make a line plot the equity field on the y-axis and time on the x-axis and also update the live streaming data on dash cards. But I don’t know how to connect this MongoDB ChangeStream websocket to Dash. Should I use quart or quart-motor package or the existing code in this mongodb tutorial would suffice albeit with some minor modifications?

The Dash Websocket tutorials sometimes uses JS code, which I don’t understand:
1.) Websocket
2.) Build a realtime Dash App with websockets | by Jacky Shi | Medium

It would be very helpful to the dash & MongoDB python community if any dash expert here can show a small working example of building a web socket between a live updating MongoDB Atlas collection and a dash app, as information on the internet to setup websocket connection between mongoDB and Dash is very scarce?

Best Regards,
Dilip

Hi @diliprk

The Websocket component from dash-extensions is the best way to enable realtime capabilities within a dash app.

As described in its documentation, you will have to create two programs.

  1. Data Service, which you can create by following Subscribe to MongoDB Change Streams Via WebSockets | MongoDB. Here you’ll get a web socket endpoint to which your dash app will connect.
  2. Dash App, where you’ll use the aforementioned WebSocket component to receive data from your data service.

For now, there is no way to escape writing little bit of javascript, as any realtime benefits you hope to get by using web sockets will vanish if you use normal callbacks instead of client side callbacks.

PS: I’m not familiar with MongoDB change streams but if you or anyone in the community can provide some code to simulate the behaviour of MongoDB change stream like in your case, I might be able to help you out better.

3 Likes

Hi @snehilvj ,
I am happy to assist you on the MongoDB side. Please find below the server.py script code below for running the MongoDB ChangeStreams WebSocket:

import tornado.httpserver
import tornado.websocket
import tornado.ioloop
import tornado.web
from motor.motor_tornado import MotorClient
from bson import json_util
from logzero import logger
import credentials


## Load MongoDB Credentials and Cluster details
credentials_mongodb = credentials.mongodb
user_id = credentials_mongodb["username"]
password = credentials_mongodb["password"]
domain = credentials_mongodb["domain"]
database = credentials_mongodb["db"]
CLIENT_URI = "mongodb+srv://{}:{}@{}/{}?retryWrites=true&w=majority".format(user_id, password, domain, database)

class WebpageHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("templates/index.html")


class ChangesHandler(tornado.websocket.WebSocketHandler):

    connected_clients = set()

    def check_origin(self, origin):
        return True

    def open(self):
        ChangesHandler.connected_clients.add(self)

    def on_close(self):
        ChangesHandler.connected_clients.remove(self)

    @classmethod
    def send_updates(cls, message):
        for connected_client in cls.connected_clients:
            connected_client.write_message(message)

    @classmethod
    def on_change(cls, change):
        logger.debug(change)
        message = f"{change['operationType']}: {change['fullDocument']}"
        ChangesHandler.send_updates(message)


change_stream = None


async def watch(collection):
    global change_stream

    async with collection.watch(full_document='updateLookup') as change_stream:
        async for change in change_stream:
            ChangesHandler.on_change(change)


def main():
    client = MotorClient(CLIENT_URI)
    collection = client[database]["FloatingEquity"]
    app = tornado.web.Application([(r"/socket", ChangesHandler), (r"/", WebpageHandler)])
    app.listen(8000)

    loop = tornado.ioloop.IOLoop.current()
    loop.add_callback(watch, collection)
    try:
        loop.start()
    except KeyboardInterrupt:
        pass
    finally:
        if change_stream is not None:
            change_stream.close()


if __name__ == "__main__":
    main()

You just have to do is type python server.py in your miniconda terminal, after installing the required packages (tornado, motor, bson, logzero) mentioned in the MongoDB ChangeStreams WebSocket tutorial . The Data will be ingested into the MongoDB collection from another python script running on my AWS EC2. Whenever you find any issues with the MongoDb changestreams not working you can ping me and I will fix the python script on the EC2 instance. Hopefully there wont be any issues and it will keep running for the duration or if not I will send you another demo script that will pump random data to the MongoDB collection which you can run locally.

I will just PM you the credentials.py file containing the credentials to my MongoDB Atlas cluster (free-tier) which you need to keep in the same folder as this server.py file, as it is used in the import credentials statement

Hi,
I have included below, a sample python script (FakeMongoData.py) that generates some random (fake) data and inserts it into a MongoDB Collection named TestCollection. You have to keep this script running in another command prompt/terminal along with server.py in the previous post. Also remember to change the text FloatingEquity in line 62 in the server.py script to TestCollection. To see the Websocket results while the two scripts are running, open your browser and type localhost:8000 in the address bar , then right click → Inspect → Console. Please find the script FakeMongoData.py below:

import pymongo
from pymongo import MongoClient,ReturnDocument
from pymongo.errors import ConnectionFailure, AutoReconnect, BulkWriteError, OperationFailure
from datetime import datetime, timedelta
import credentials
import random
import time

## Load MongoDB Credentials and Cluster details
## Fix MongoClient DB connection errors by following this - https://www.youtube.com/watch?v=wvlJGvP18Qk
credentials_mongodb = credentials.mongodb
user_id = credentials_mongodb["username"]
password = credentials_mongodb["password"]
domain = credentials_mongodb["domain"]
database = credentials_mongodb["db"]
CLIENT_URI = "mongodb+srv://{}:{}@{}/admin?retryWrites=true&w=majority".format(user_id, password, domain)

def MongoDB_Connection(client_uri : str = CLIENT_URI) -> dict:
    '''
    Makes a connection to the MongoDB Cluster and returns success or exception handling (error) messages.
    ## Reference: https://www.programcreek.com/python/example/94224/pymongo.errors

    Parameters
    ----------
    client_url  : str, default = CLIENT_URI stored in MongoWorks.py
                  Client URL to connect to the MongoDB cluster.

    Returns
    -------
    dict        : {Success or Failure message}
    '''
    client = MongoClient(client_uri)
    try:
        client.admin.command('ismaster') # The ismaster command is cheap and does not require auth.
        return client
    except (AutoReconnect, ConnectionFailure) as e:
        error_message = e + ": Server not available"
        print(error_message)
        return e

def MongoDB_Collection_Write(collection_name : str, docs_to_insert: dict, db: str = database) -> str:
    '''
    Writes documents to a MongoDB Collection.
    Parameters
    ----------
    collection_name : str, name of the collection in MongoDB
    docs_to_insert  : a single dict,
                      Document to insert into a MongoDB collection in list of dicts format
    db              : str, name of the database in MongoDB

    Returns
    -------
    messages        : dict, {doc_iD : Success/Failure message}
                      Success or Failure message with documents insertion ID

    '''
    client = MongoDB_Connection()
    db = client[db]
    collection = db[collection_name]
    try:
        result = collection.insert_one(docs_to_insert).inserted_id
        message = "{} - Document Inserted successfully into `{}` collection".format(result, collection_name)
        return {result : message}
    except Exception as e:
        error_message = "An {} Exception occured when inserting a document into `{}` Collection. ".format(e,collection_name)
        print(error_message)
        return error_message

initial_equity = 100000.0

while True:
    new_row = {}
    timestamp = datetime.now().replace(microsecond=0)
    pnl = round(random.uniform(-1000,999), 2)
    nr_open_positions = random.randint(0,5)
    equity = round(initial_equity + pnl, 2)
    new_row = {"time": timestamp, "equity": equity, "PnL" : pnl, "OpenPositions" : int(nr_open_positions)}
    MongoDB_Collection_Write(collection_name = "TestCollection", docs_to_insert = new_row) ## Insert last equity value at the time of closing the last open position
    print(f"Timestamp: {timestamp} || OpenPositions: {nr_open_positions} || Equity: {equity} US$  || Profit: {pnl} US$")
    time.sleep(1)

I hope this helps to recreate the problem at your end, very easily. The resulting incoming stream of WebSocket data from MongoDB needs to be plotted in a streaming line plot equity vs. time and other fields like OpenPositions, PnL needs to be shown on these dash cards . Let me know if you need any further assistance from my end.

Best Regards,
Dilip

Hey @diliprk

Sorry I didn’t reply sooner. Here’s a very simple app to show you how you can use web sockets within a dash app. You may have to make some changes to how you are sending the data on the web socket connection to be able to easily parse and use them in the client side callback in the app. For now I’m just printing the data that is being sent on the web socket connection.

from dash import Dash, Output, Input, html
from dash_extensions import WebSocket

app = Dash(__name__)

app.layout = html.Div(
    [
        WebSocket(id="websocket", url="ws://127.0.0.1:8000/socket"),
        html.P(id="data"),
    ]
)

callback = """
function updateUI(msg) {
    if (msg) {
        const data = msg["data"]
        // const values = JSON.parse(data)
        return data
    }
    return window.dash_clientside.no_update
}"""

app.clientside_callback(
    callback,
    Output("data", "children"),
    Input("websocket", "message"),
)

if __name__ == "__main__":
    app.run(debug=True)

While using in an actual app, I’d just store the data received in dcc.Store and create another dcc.Interval callback that triggers after every n seconds and update the UI from the data stored in the store.

1 Like

Hi @snehilvj,
Thank you for your feedback. I fixed the JS callback and now the dash app works from the client side callback.
I modified the on_change function in the server.py send a JSON message as output instead of str.

@classmethod
def on_change(cls, change):
     logger.debug(change)
     change_doc = change['fullDocument']
     result = {k: change_doc[k] for k in ('time', 'equity', 'balance', 'PnL' , 'OpenPositions') if k in change_doc}
     ChangesHandler.send_updates(json.dumps(result))

I don’t think it is possible using this approach to make LIVE streaming line plots, as we are returning only one single value at a time, but a go.Scatter plot requires an array of values for the x-axis and y-axis.

Nevertheless, this is my final Dash App:

from dash import Dash, Output, Input, html,dcc
from dash_extensions import WebSocket
import json

app = Dash(__name__)

app.layout = html.Div(
    [
        WebSocket(id="ws", url="ws://127.0.0.1:8000/socket"),
        html.P(id="pd1"),
        html.P(id="pd2"),
        html.P(id="pd3"),
        html.P(id="pd4")
    ]
)

## To display values on dash cards
callback_js = """
function updateUI(msg) {
    if (msg) {
        const data = msg["data"]
        const x = JSON.parse(data)
        return [x.equity, x.balance, x.PnL, x.OpenPositions]
    }  return window.dash_clientside.no_update
}
"""

app.clientside_callback(
    callback_js,
    [Output("pd1", "children"),Output("pd2", "children"),
     Output("pd3", "children"),Output("pd4", "children") ],
    Input("ws", "message"),
)


if __name__ == "__main__":
    app.run(debug=True)

Thanks and Best Regards,
Dilip

Hi all:

Do any of you have any screenshots, animated gifs, or publicly accessible Dash apps that showcase websockets approach (vs dcc.interval)?

I’m thinking of akin to the following which I’ve stumbled across which (I think both) use Plotly charts:

Jannis Lübbe: Web based live visualisation of sensor data - YouTube (at time 10:40)
GitHub - strath-sdr/rfsoc_sam: RFSoC Spectrum Analyser Module on PYNQ.

I have these references (including this community post) so I know such CAN be done … I just haven’t seen any good storylines (other that the spectrum analyzer linked to above, but I think it uses Voila for the UI?)

1 Like

Hi Dave,
My experience with dcc.Interval was not good for live streaming data to be visualized in plotly-dash. That is why I resorted to WebSockets / ChangeStreams. Anyways, here are some references I collected, which you could try, I imagine it would involve using a local data store like Redis or something for faster local access.

  1. MongoDB Atlas and Live Chart Update on plotly-dash - #2 by Tobias_Gardhus - MongoDB Atlas - MongoDB Developer Community Forums
  2. Python - how to 'stream' data from my MongoDB collection? - Stack Overflow

If you arrive at a viable solution to make faster streaming plots via WebSockets, then please do share.

Best Regards,
Dilip

1 Like

I put a small example in the docs,

I haven’t included a gif though :upside_down_face:

1 Like

Hi Emil,
The websocket documentation example is not sufficient, for our requirements of making a streaming line plot for data streamed from a live updated time series database collection, stored in a cloud data warehouse like MongoDB, or Timescale DB, Dynamo DB etc.
Your example is just generating some 10 random points locally and sending it to the local websocket. We have problems understanding how to setup the websocket connection between the cloud data warehouse (MongoDB Atlas collection) and local websocket via Quart or directly to Dash Websocket.

If you can show a demo of a dash streaming line plot for streaming data received from a time series MongoDB collection, it will be very helpful.

Best Regards,
Dilip