✊🏿 Black Lives Matter. Please consider donating to Black Girls Code today.
🧬 Learn how to build RNA-Seq data apps with Python & Dash. Register for the May 20 Webinar!

Getting live Data from UDP socket

Hello,

I am trying to visualize live Data received by a UDP socket in Dash.
The socket runs in a different Thread, but in the same file and constantly updates the lists

time_list = []
light_list = []
temperature_list = []
label_list = []

which are used as input for the graph (id: “box-plot”)
However, the data does not arrive in the Dashboard, i.e. the plot stays blank. When I print the list after 10 seconds of sleeping, I see that the list is not empty (see the end of my code below).

With this post I want to know whether this is due to the general disability of Dash to exchange data between processes or whether it could generally work the way I specified it in the code (and I am missing something else)

I already checked out this post: Populating dash with data from udp thread, but since there is not really an answer, I want to ask again.

before trying to fit everything in one file, I also stored the UDP-received data in a file on the HD and made the callback function read it in every time it updates. This worked, but seemingly at a low performance.

So bottomline question: Is this problem Dash-specific, (i.e. I have to look in the Dash user guide: https://dash.plot.ly/sharing-data-between-callbacks, or am I generally missing something (which would probably lead me to stackexchange).

Edit: I think the socket code can mostly be ignored, the interesting part is the Dashboard and the callback function further below (as specified in code comments).

Thanks in advance!

Below my code:

# packages
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Output, Input, Event
import plotly.graph_objs as go
from collections import deque
import datetime as dt
import time
import numpy as np
import threading
import pickle
import socketserver
import pandas as pd

### UDP Socket ### ------------------------------------------------------------

# definition of Thread

# define as empty lists
time_list = []
light_list = []
temperature_list = []
label_list = []

class Socket_Thread(threading.Thread):
    def __init__(self, iD, port):
        threading.Thread.__init__(self)
        self.iD = iD
        self.port = port
        
    def run(self):
        
        # Initialize Socket
        PORTNO = self.port
        dt_classifier = pickle.load(open("sensor_dt.sav", 'rb'))

        # converter for UNIX-times
        dateconv = np.vectorize(dt.datetime.fromtimestamp)
        
        class handler(socketserver.DatagramRequestHandler):
            
            def handle(self):
                newmsg = self.rfile.readline().strip().decode()
                print(newmsg)
                
                # split the message data into sensor values
                lgt, tmp, lab = newmsg.split(',') 
                
                # append the sensor values to 
                time_list.append(dateconv(time.time()))
                light_list.append(lgt)
                temperature_list.append(tmp)
                label_list.append(lab)
                
                # convert the sensor values into a 
                d = {'light': [lgt], 'temperature': [tmp]}
                obs = pd.DataFrame(d)
                
                pr = dt_classifier.predict(obs)
                print("DT said: ", pr[0])
                
                pi_msg = str(pr[0]).encode()
                print(pi_msg)
                
                # send back the prediction
                self.wfile.write(pi_msg)

        s = socketserver.UDPServer(('',PORTNO), handler)
        self.s = s

        print("Awaiting UDP messages on port {}".format(PORTNO))
        s.oldmsg = "This is the starting socketservermessage."

        s.serve_forever()
        
    def stop(self):
        self.s.shutdown()
        print("socket closed successfully")





### Dashboard ### -------------------------------------------------------------

# for initial deque
dateconv = np.vectorize(dt.datetime.fromtimestamp)

# init deques
l = deque(maxlen=25)
l.append(0)
t = deque(maxlen=25)
t.append(0)
tm = deque(maxlen=25)
tm.append(dateconv(time.time()))

app = dash.Dash()

app.css.append_css({'external_url': 'https://codepen.io/amyoshino/pen/jzXypZ.css'})

app.layout = html.Div([
        

            # Selector row
            html.Div([
                    html.Div([
                        html.P('Show:'),
                        dcc.Checklist(id = 'variables',
                                      options=[{'label': 'Light', 'value': 'lgt_c'},
                                               {'label': 'Temperature', 'value': 'tmp_c'}
                                               ],
                                      values=['lgt_c', 'tmp_c'],
                                      labelStyle={'display': 'inline-block'}
                                      ), # End Checklist
                    ],
                    className='six columns',
                    style={'margin-top': '10'}
                    ),
                    
                    
                    
            ], className = 'row'), # End row (2)


            # Graph row
            html.Div([
                
                                   
                html.Div(dcc.Graph(id="box-plot", 
                                   animate=False), className = 'twelve columns'),
                                   
                # Event
                dcc.Interval(id="graph-update",
                             interval=1000
                             ),
                                       
                                   
            ], className = 'row'), # End row (3)
        
        ], className='ten columns offset-by-one') # End Global Div

           
# box plot + Callback
@app.callback(Output("box-plot", "figure"),
              [Input('variables', 'values')],
              events = [Event("graph-update", "interval")])
def update_box_plot(selector):
    
    data = []
    
    data_l = go.Box(
            y=light_list,
            name="Light"
            )
    
    data_t = go.Box(
            y=temperature_list,
            name="Temperature"
            )
    
    if 'lgt_c' in selector:
        data.append(data_l)
        
    if 'tmp_c' in selector:
        data.append(data_t)
    
    return{"data":data,
           "layout": go.Layout(legend=dict(orientation="h"))}

### Main:
if __name__ == '__main__':
    t1 = Socket_Thread(1, 10556) 
    t1.start()
    time.sleep(10)
    print(light_list)
    app.run_server(debug=True)

@enq,

I don’t have an answer, but I am doing something similar and it is working … for now. I’m trying to formulate a question for I too am concerned about scope issues with threading etc. before I invest too much time 1

My application that is getting data from a telnet socket (streamed binary or text from some C applications.) I have a python thread (IO limited) that babysits these connections asynchronously, checks for data, unpacks it and stores it in various data-sets for the dash callbacks. It has a method that is used by callbacks to retrieve data (sensor data). This is streaming data, so I use the interval feature to trigger callbacks to update the graphs that use this data.

I’ve tried running the dash app in default (threading) mode and with threads disabled and processes=4. In both instances it works OK.

    app.run_server(threaded=False,processes=4)

(I haven’t looked into whether I’m truly using multiple processes. And my C applets are indicating only one entity is accessing them, so I know that multiple telnet sockets are not being opened.)

I’ve opened some 10 clients (all on a local machine, this is running on a private network.) They all are showing the data as I would expect. Each client performs its own accesses to the thread that handles the interfaces (I see it being called each interval by each client) but this is accounted for in the design.

Sorry I couldn’t be of more assistance.

1 I’m working on an off-line lab equipment control and monitor. I’ve looked into various ways to be able to view my real time sensor information and Dash won among all the various options. I’m now concerned that it might not be suitable for a command and control interface…

1 Like

Hi @radiofreq,

thanks for your reply. Unfortunately, this did not do the trick for me.

First of all, I am not able to run app.run_server() without debug=True, this throws
ValueError: Your platform does not support forking.
So I tried running
app.run_server(debug=True,threaded=False,processes=4)
which caused the Dash app not to run at all (i got the *restarting with stat output in the console but couldn’t open it in the browiser).

With
app.run_server(debug=True,threaded=False,processes=1)
and
app.run_server(debug=True,threaded=False)

it was possible to run the app, but the still same problem of not seeing any data points in the plot.

Is your method to retrieve the sensor data somewhat different to mine? i.e. is there something special about the way the data is retrieved?

Thanks again very much

I fear I may have led you astray. I do not recommend using processes, just that I had them working (since my post they seem to be doing things I don’t want so I am not using them.)

Can you get your example to be self contained? I might be able to see if it runs on my setup if you can get it self contained.

I’m on ubuntu16. If I run with or without debug, no other values (i.e.threading enabled by default) it is working. I’m guessing you are on windows since there is no forking. This may be the difference.

Here is a stand-alone outline that works for me. See if it works on your system. It should produce a moving plot of samples, with pause and reset control.

It’s not the tightest code, but not bad for an RTL designer. Feel free to point out any corrections/optimizations. Anyone feel free to shout out if this is fragile and will likely break easily. On my systems I will not be running multi-process. Even then, my producer can handle multiple socket connections, and my controller can handle multiple command connections (and performs its own arbitration.) In this manner, so long as the threads are available for each of producer and controller, it should work.

"""
Producer/Controller Dash Outline
Copyright 2018 Steve Korson
"""
import dash
from dash.dependencies import Output, Event, Input, State
import dash_core_components as dcc
import dash_html_components as html
import plotly
import random
import plotly.graph_objs as go
from collections import deque
import threading as th
import time as t
import random as rnd


class producer(th.Thread):
    def __init__(self, threadID):
        th.Thread.__init__(self)
        self.threadID = threadID
        self.done = False
        self.sleep_time = 0.250
        self.data = []
        self.resetData()
        self.pause = False
        self.fakeRspTime = 0
        self.busy = False
    def stop(self):
        self.done = True
    def run(self):
        while self.done == False:
            # Main loop
            self.busy = self.__update(timeout=0.250)
            t.sleep(self.sleep_time)
        # Cleanup
    def __update(self,timeout=0.250):
        """  Here the producer would access some data inbound socket/IO
        Faking asynchronous data below.
        """
        # -------------------------
        if self.pause:
            # Emulate paused, busy
            return True
        else:
            fakeDelay = rnd.randint(1,4)
            if self.fakeRspTime < fakeDelay:
                t.sleep(timeout)
                self.fakeRspTime = self.fakeRspTime+1
                # Emulate busy
                return True
            else:
                self.fakeRspTime = 0
                # Emulate data ready (not busy)
                # Emulate updating data from producer
                self.data = self.data[1:]
                fakeData = self.data[-1] + (rnd.randint(1,10)-5)
                self.data.append(fakeData)
                if self.data[-1] < 0:
                    self.data[-1] = 0
                if self.data[-1] >30:
                    self.data[-1] = 30
                return False
        # -------------------------
    def getData(self):
        print("INFO: Data Request. Busy?", self.busy)
        return self.data
    def pauseData(self, state):
        self.pause = state
    def resetData(self):
        self.data = [0,0,0,0,0,0,0,0]

class controller(th.Thread):
    def __init__(self, threadID,target):
        # Passing in producer target here for demo
        # Typical system has controller and producer connected external
        th.Thread.__init__(self)
        self.threadID = threadID
        self.done = False
        self.target = target
        self.sleep_time = 0.250
        self.busy = False
        self.__newCmd = False
        self.fakeRspTime = 0
        self.paused = False
    def stop(self):
        self.done = True
    def run(self):
        while self.done == False:
        # Main loop
            if self.__newCmd:
                self.__newCmd = False
                self.__procCmd()
            if self.busy:
                self.busy = self.__check_for_rsp(timeout=0.250)
            t.sleep(self.sleep_time)
        # Cleanup
    def send_cmd(self, cmd):
        """ Method used by callers to send commands to the command interface """
        if self.busy == False:
            self.busy = True
            self.__newCmd = True
            self.__cmd = cmd;
            return True
        else:
            raise UserWarning("Command interpreter is busy.")
    def __procCmd(self):
        """ Emulate sending a commnd out a socket or other IPC
        """
        # -----------------------------
        self.busy = True
        if self.__cmd == 'P': # Pause
            if self.paused:
                self.paused = False
            else:
                self.paused = True
            self.target.pauseData(self.paused)
            
        if self.__cmd == 'R':
            self.target.resetData()
        # -----------------------------
    def __check_for_rsp(self,timeout):
        """ Once command is sent out the interface, await a response (or timeout) """
        # For now emulate checking for a response
        #---------------------------------
        # This can take a random amount of loops (not time really)
        # before setting a non-busy result.
        fakeDelay = rnd.randint(1,4)
        if self.fakeRspTime < fakeDelay:
            t.sleep(timeout)
            self.fakeRspTime = self.fakeRspTime+1
            return True
        else:
            self.fakeRspTime = 0
            return False
        #---------------------------------


app = dash.Dash(__name__)
app.layout = html.Div(
    [
      dcc.Graph(id='live-graph', animate=False),
      dcc.Interval(
        id='graph-update',
        interval=250
        ),
      html.Button('Pause/Resume', id='buttonPause',),
      html.Button('Reset', id='buttonReset',),
      dcc.Checklist(
          id='status',
          options=[
              {'label': 'Producer Paused', 'value': 'P'},
              {'label': 'Controller Busy', 'value': 'B'},
          ],
          values=[]
      ),
    ]
    )

@app.callback(Output('live-graph','figure'),
              events = [Event('graph-update','interval')])
def update_graph():
    x = [1,2,3,4,5,6,7,8]
    y = producerObj.getData()
    data = go.Scatter(
        x = x,
        y = y,
        name = 'myScatter',
        mode = 'lines+markers'
    )
    return {'data':[data],'layout': go.Layout(xaxis = dict(range=[0.9*min(x),1.1*max(x)]),
                                              yaxis = dict(range=[0,32]))}

@app.callback(Output('status','values'),
              events = [Event('graph-update','interval')])
def update_status():
    newValues = []
    if controllerObj.busy:
        newValues.append('B')
    if producerObj.pause:
        newValues.append('P')
    return newValues

@app.callback(Output('buttonReset', 'children'),
              [Input('buttonReset', 'n_clicks')],)
def send_reset(n_clicks):
    # Optionally to try and except, you could semaphore protect the calls (with fileIO rather than globals)
    try:
        controllerObj.send_cmd('R')
    except:
        print("Interface is busy. Handle the exception.")
    return 'Reset'

@app.callback(Output('buttonPause', 'children'),
              [Input('buttonPause', 'n_clicks')])
def send_pause(n_clicks):
    # Optionally to try and except, you could semaphore protect the calls (with fileIO rather than globals)
    try:
        controllerObj.send_cmd('P')
    except:
        print("Interface is busy. Handle the exception.")
    return 'Pause/Resume'
pass


if __name__ == "__main__":
    producerObj = producer(1)
    controllerObj = controller(2,producerObj)

    producerObj.start()
    controllerObj.start()

    app.run_server(debug=True)

    producerObj.stop()
    controllerObj.stop()
    
    print("Fin.")
2 Likes

@radiofreq,

sorry for the late reply. Thanks for the code! Indeed it works for me. I’ll check if I can combine it with my code problem.

Cheers!

For those coming in now, I updated @radiofreq 's answer to Dash 1.9:

"""
Producer/Controller Dash Outline
Copyright 2018 Steve Korson
"""
import dash
from dash.dependencies import Output, Input, State
import dash_core_components as dcc
import dash_html_components as html
import plotly
import random
import plotly.graph_objs as go
from collections import deque
import threading as th
import time as t
import random as rnd


class producer(th.Thread):
    def __init__(self, threadID):
        th.Thread.__init__(self)
        self.threadID = threadID
        self.done = False
        self.sleep_time = 0.250
        self.data = []
        self.resetData()
        self.pause = False
        self.fakeRspTime = 0
        self.busy = False

    def stop(self):
        self.done = True

    def run(self):
        while self.done == False:
            # Main loop
            self.busy = self.__update(timeout=0.250)
            t.sleep(self.sleep_time)
        # Cleanup

    def __update(self, timeout=0.250):
        """  Here the producer would access some data inbound socket/IO
        Faking asynchronous data below.
        """
        # -------------------------
        if self.pause:
            # Emulate paused, busy
            return True
        else:
            fakeDelay = rnd.randint(1, 4)
            if self.fakeRspTime < fakeDelay:
                t.sleep(timeout)
                self.fakeRspTime = self.fakeRspTime + 1
                # Emulate busy
                return True
            else:
                self.fakeRspTime = 0
                # Emulate data ready (not busy)
                # Emulate updating data from producer
                self.data = self.data[1:]
                fakeData = self.data[-1] + (rnd.randint(1, 10) - 5)
                self.data.append(fakeData)
                if self.data[-1] < 0:
                    self.data[-1] = 0
                if self.data[-1] > 30:
                    self.data[-1] = 30
                return False
        # -------------------------

    def getData(self):
        print("INFO: Data Request. Busy?", self.busy)
        return self.data

    def pauseData(self, state):
        self.pause = state

    def resetData(self):
        self.data = [0, 0, 0, 0, 0, 0, 0, 0]


class controller(th.Thread):
    def __init__(self, threadID, target):
        # Passing in producer target here for demo
        # Typical system has controller and producer connected external
        th.Thread.__init__(self)
        self.threadID = threadID
        self.done = False
        self.target = target
        self.sleep_time = 0.250
        self.busy = False
        self.__newCmd = False
        self.fakeRspTime = 0
        self.paused = False

    def stop(self):
        self.done = True

    def run(self):
        while self.done == False:
            # Main loop
            if self.__newCmd:
                self.__newCmd = False
                self.__procCmd()
            if self.busy:
                self.busy = self.__check_for_rsp(timeout=0.250)
            t.sleep(self.sleep_time)
        # Cleanup

    def send_cmd(self, cmd):
        """ Method used by callers to send commands to the command interface """
        if self.busy == False:
            self.busy = True
            self.__newCmd = True
            self.__cmd = cmd;
            return True
        else:
            raise UserWarning("Command interpreter is busy.")

    def __procCmd(self):
        """ Emulate sending a commnd out a socket or other IPC
        """
        # -----------------------------
        self.busy = True
        if self.__cmd == 'P':  # Pause
            if self.paused:
                self.paused = False
            else:
                self.paused = True
            self.target.pauseData(self.paused)

        if self.__cmd == 'R':
            self.target.resetData()
        # -----------------------------

    def __check_for_rsp(self, timeout):
        """ Once command is sent out the interface, await a response (or timeout) """
        # For now emulate checking for a response
        # ---------------------------------
        # This can take a random amount of loops (not time really)
        # before setting a non-busy result.
        fakeDelay = rnd.randint(1, 4)
        if self.fakeRspTime < fakeDelay:
            t.sleep(timeout)
            self.fakeRspTime = self.fakeRspTime + 1
            return True
        else:
            self.fakeRspTime = 0
            return False
        # ---------------------------------


app = dash.Dash(__name__)
app.layout = html.Div(
    [
        dcc.Graph(id='live-graph', animate=False),
        dcc.Interval(
            id='graph-update',
            interval=250
        ),
        html.Button('Pause/Resume', id='buttonPause', ),
        html.Button('Reset', id='buttonReset', ),
        dcc.Checklist(
            id='status',
            options=[
                {'label': 'Producer Paused', 'value': 'P'},
                {'label': 'Controller Busy', 'value': 'B'},
            ],
            value=[]
        ),
    ]
)


@app.callback(Output('live-graph', 'figure'),
              [Input('graph-update', 'n_intervals')])
def update_graph(interval):
    x = [1, 2, 3, 4, 5, 6, 7, 8]
    y = producerObj.getData()
    data = go.Scatter(
        x=x,
        y=y,
        name='myScatter',
        mode='lines+markers'
    )
    return {'data': [data], 'layout': go.Layout(xaxis=dict(range=[0.9 * min(x), 1.1 * max(x)]),
                                                yaxis=dict(range=[0, 32]))}


@app.callback(Output('status', 'value'),
              [Input('graph-update', 'n_intervals')])
def update_status(interval):
    newValues = []
    if controllerObj.busy:
        newValues.append('B')
    if producerObj.pause:
        newValues.append('P')
    return newValues


@app.callback(Output('buttonReset', 'children'),
              [Input('buttonReset', 'n_clicks')], )
def send_reset(n_clicks):
    # Optionally to try and except, you could semaphore protect the calls (with fileIO rather than globals)
    try:
        controllerObj.send_cmd('R')
    except:
        print("Interface is busy. Handle the exception.")
    return 'Reset'


@app.callback(Output('buttonPause', 'children'),
              [Input('buttonPause', 'n_clicks')])
def send_pause(n_clicks):
    # Optionally to try and except, you could semaphore protect the calls (with fileIO rather than globals)
    try:
        controllerObj.send_cmd('P')
    except:
        print("Interface is busy. Handle the exception.")
    return 'Pause/Resume'


pass

if __name__ == "__main__":
    producerObj = producer(1)
    controllerObj = controller(2, producerObj)

    producerObj.start()
    controllerObj.start()

    app.run_server(debug=True)

    producerObj.stop()
    controllerObj.stop()

    print("Fin.")

1 Like