How to plot and read data in parallel?

Hi!
I’m working on a dashboard to plot live data from a serial port, using the dash framework.
I have two separate files, one that reads the data continuously and stores the data in a dataframe, and one that includes the setup of the dash-app.
The problem I have is being able to run these in parallel, so the data is read continuously while the dash-app updates the graphs every second or so.
I would also like to start and stop the data reading with buttons in the dash framework, so they would need to be able to communicate with each other.

Does anyone have a solution for this?

1 Like

I am currently doing something exactly like this.

I used dcc.Interval to get live updates to the screen; so basically the interval component callback is asking from a data source object (I call it a “ControlCenter”) every 500ms that is there new data. Then, I run in separate threads (subclassing threading.Thread) the serial port devices, which send data using queue.Queue to pass data from the threads to the ControlCenter.

Thank you! Do you have any code of the threading implementation?

Unfortunately I can not share the full code but the idea is to have the threads to be properties of the “ControlCenter”. Each thread looks something like this


class DeviceThread(Thread):
    def __init__(self, q_send, q_receive):

        self.device = device
        self.q_send = q_send
        self.q_receive = q_receive
        self.running = False
        super().__init__()
        self.setDaemon(True)

    def run(self):
        self.running = True
        logger.info('Starting measurement with device')
        while self.running:

            data = self.device.measure()
            self.send_data(data)
            self.check_input_queue()
            
        logger.info('Ending measurement with device')

    def send_data(self, data):
        logger.debug('Sending device measurement data, %s', data)
        self.q_send.put(data)

    def check_input_queue(self):
        while not self.q_receive.empty():
            item = self.q_receive.get()
            if not item.get('command', None):
                continue
            command = item.get('command')
            if command == 'END':
                self.running = False

You can see that

  • Each device has one input and one output queue for messaging (queue.Queue)
  • The device can be stopped from the main thread by sending dictionary {'command': 'END'} using the receiving queue.

Okay, thank you! How do I make sure that the dash app still runs and is responsive to callbacks?

You run your blocking code (for example, the data collector) in a separate thread/threads. For example:

thread = DeviceThread(q_send=q_receive,
                          q_receive=q_send,
                          device=device)
thread.start()

then, periodically, from a dcc.Interval callback, check if the q_send queue has any data

@app.callback(
    Output('timeserie-measurement-graph', 'extendData'),
    [
        Input('interval-measurement', 'n_intervals'),
    ],
)
def update_graph_live(n_intervals):
    return get_timeserie_update(n_intervals)

and the contents of get_timeserie_update would be something like…

def get_timeserie_update(self, n_intervals):

    q = self.q_receive
    x = [[]]
    y = [[]]
    while not q.empty():
        data = q.get()
        
        ydata = data.get('Ydata', 0)
        timestamp = data.get('timestamp')
        
        time_s = (timestamp -
                  self.start_datetime) / dt.timedelta(seconds=1)

        x[0].append(time_s)
        y[0].append(ydata)
            
    # ...

(obviously, lot of the code in the example has been left out but I think you can see the logic)

1 Like

Yes, thank you! I understand the logic - however, I still have one problem. How do I send an input value from the dash function to the other thread? I would like to be able to ask the user for the current port, and then start reading data in the thread based on that value. Is this possible?

@olnen Yes sure it is possible. When using python threads (threading.Thread) probably the best way for thread-to-thread communication is the (queue.Queue). You will need thread-to-thread communication (queues) only if you need to pass information between main thread and the “subthread” after the subthread has been started. So, there are two options

Option 1:

  • Ask current port from user
  • Pass the port number as an argument to the __init__() method of the thread for the data acquisition.
  • Start the thread.

Option 2:

  • Start the thread for data acquisition any time you wish.
  • Ask the port from user
  • Send the port number to the thread using a queue.

I prefer to use two queues: One is for “send” and one is for “receive” in each of the threads. You pass the queue(s) as arguments to the Thread when you initiate it.

Hey @olnen have you made any headway in your project? If so would you be willing to share some of your resulting code? I’m working on the same idea and I’ve been having some trouble putting something cohesive together, your help would be greatly appreciated.

@fohrloop in your example code, how does your callback function know what the ‘get_timeserie_update’ function is? get_timeserie_update appears to be a class method so how does the app callback access that method?