Dash with udp socket threading to update data

hi, i’m trying use udp socket (with a background threading) to update(stream) real time data and visualize with dash.

i’m sure my udp and threading can works, but when compose with DASH, i always got error:

OSError: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

i have examed followings but pity no progress:

is there anything i can try? thx for advising :slight_smile:

below code is my testing sending of udp (with background threading):

import time
import math

from lib_msgdef     import MSG
from lib_socketudp  import socketSendThread


msg = MSG()

st = socketSendThread('localhost', 12347, 0.01, msg)
st.start()


tic = time.time()
rt_step = 0.05


while True:
    print(msg.x)
    time.sleep(rt_step)

    msg.x = 50 + 50 * math.sin(2*math.pi*0.05*(time.time() - tic)) # generate sine wave data
    
    st.setNewMsg(msg)

below code is my testing receiving of udp (with background threading)

import time 
from lib_msgdef     import MSG  
from lib_socketudp  import socketRecvThread

msg  = MSG()

st = socketRecvThread('localhost', 12347, 0.001, msg)

st.start()

while True:

    print(msg.x)

    time.sleep(0.5)

here is my problem code


from lib_msgdef     import MSG  
from lib_socketudp  import socketRecvThread

msg  = MSG()
st = socketRecvThread('localhost', 12347, 0.001, msg)


# # # # # # # # # # # #
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import plotly

plotly.offline.init_notebook_mode(connected=True)

app = dash.Dash()

app.layout = html.Div([
    html.Div(id = 'recvData'),
    dcc.Interval(id = 'tc', interval = 1*1000),
])


@app.callback(
    Output('recvData','children'),
    Input('tc','n_intervals') 
)
def update_annoation(n):
    global msg
    global st
    msg = st.getNewMsg()
    # print(msg.x)

    style = {'padding': '5px', 'fontSize': '16px'}
    return html.Span('current udp data is: {0:0.4f}'.format(msg.x), style = style)


if __name__ == '__main__':
    st.start()
    app.run_server(debug=True)

some self-wrote-libs should be imported:

  • lib_baseCStruct.py
import ctypes
class BaseStructure(ctypes.Structure):

    def __init__(self, **kwargs):
        """
        Ctypes.Structure with integrated default values.

        :param kwargs: values different to defaults
        :type kwargs: dict
        """

        values = type(self)._defaults_.copy()
        for (key, val) in kwargs.items():
            values[key] = val

        super().__init__(**values) 
  • lib_msgdef.py
from ctypes import *
from lib_baseCStruct import BaseStructure

class MSG(BaseStructure):
    _pack_ = 1
    _fields_ = [
        ("x", c_float),
        ("y", c_float),
    ]
    
    _defaults_ = {
        "x": 0.0,
        "y": 0.0,
    }

  • lib_socketudp
import threading
import socket
import time

class socketSendThread(threading.Thread):
    
    toAddr = ('localhost', 12345)
    rt_step = 0.1
    msg = None
    
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def __init__(self, toIP, toPort, rt_step, MSG):
        threading.Thread.__init__(self)
    
        self.toAddr = (toIP, toPort)
        self.rt_step = rt_step
        self.msg = MSG
        self.s.setblocking(0)

    def run(self):
        while True:
            # print(self.msg.x)
            time.sleep(self.rt_step)
            self.s.sendto(self.msg, self.toAddr)

    def setNewMsg(self, MSG):
        self.msg = MSG




class socketRecvThread(threading.Thread):
    
    fromAddr = ('localhost', 12345)
    rt_step = 0.1
    msg = None
    
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    def __init__(self, fromIP, fromPort, rt_step, MSG):
        threading.Thread.__init__(self)
    
        self.fromAddr = (fromIP, fromPort)
        self.rt_step = rt_step
        self.msg = MSG
        self.s.setblocking(0)
        self.s.bind(self.fromAddr)
        
    def run(self):
        while True:
            # print(self.msg.x)
            time.sleep(self.rt_step)
            try:
                # self.s.bind(self.fromAddr)
                self.s.recvfrom_into(self.msg)
            except:
                pass

    def getNewMsg(self):
        return self.msg