✊🏿 Black Lives Matter. Please consider donating to Black Girls Code today.
🐇 Announcing Dash VTK for 3d simulation graphics. Check out the March webinar.

Multiprocess pipe data to callback and update values

Dash V.1.12.0 | DCC V1.10.0 | HTML V1.0.3 | Python3.8.2 | Ubuntu 20.04

I am aiming to live update data to my dashboard (plotly-dash) from a datastream I receive through a multiprocessing pipe. However, the value callback does not update, receiving:

Traceback (most recent call last):
  File "/home/----/Documents/GitHub/----/CLIENT/venv/VISUALIZATION.py", line 146, in update_gauge
    packet = pipe.recv()
AttributeError: 'int' object has no attribute 'recv'

I run three different processes. P1 is the get_data, where a socket is opened to the server providing the data. P2 ought to be the dashboard. P3 is a debug process, running merely to see wether the client is receiving any data (which it is):

    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=get_data, args=(parent_conn, address,))
    p2 = multiprocessing.Process(target=dashboard, args=(child_conn, ))
    p3 = multiprocessing.Process(target=print_pipe, args=(child_conn, ))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

P1 sends, whatever the state of the connection to the server, always following data over the connection pipe: conn.send([id, an1, anVar, connected, time_since_valid]), for example ['11', '24', '6', True, 4.76837158203125e-07]

Underneath is the dashboard I wrote, where I think something is wrong in the @app.callback/update_gauge?

####DASHBOARD####
def dashboard(conn):
    app = dash.Dash()
    app.layout = html.Div([
        dcc.Interval(
            id="interval-components",
            interval=1*10000,
            n_intervals=0
        ),
        daq.Gauge(
            id='gauge-chart',
            color={"gradient": True, "ranges": {
                "green": [0, 10], "yellow": [10, 30], "red": [30, 60]}},
            value=0,
            max=60,
            min=0,
            units="M/S",
        )
    ], className='row', style={'textAlign': 'center'})
    @app.callback(
        output=[Output('gauge-chart', 'value')],
        inputs=[Input('interval-components', 'n_intervals')]
    )
    def update_gauge(pipe=conn):
        packet = pipe.recv()
        value = packet[1]
        return value

    app.run_server(port=8042,debug=True)

SEE WHOLE CODE BELOW:

#GENERAL INPUTS
import sys
import multiprocessing

#CONNECTION INPUTS
import socket
import time

#DASHBOARD INPUTS
import dash
import dash_core_components as dcc
import dash_html_components as html
#import dash_bootstrap_components as dbc
import dash_daq as daq
import plotly
import plotly.graph_objects as go
from dash.dependencies import Input, Output


#INITIALIZE VARIABLES
can_id = '0'
an1 = '0'
anVar = '0'
connection_status = '\033[91m' + 'DISCONNECTED' + '\033[0m'
time_since_valid = 0
connected = False
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

####DATA####
def parse_data(data, last_data):
    if (len(data)>0 and data[0] == 'A' and data[-1] == 'Z'):
        valid = True
        return [data[2:-2],valid] #REMOVE START AND END MARKERS WITH SPACES
    else:
        valid = False
        return [last_data,valid]

def try_connect(address):
    global connected
    global client
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.settimeout(2)
    try:
        client.connect(address)
        connected = True
    except:
        connected = False
        client.close()
    finally:
        return [client, connected]

def get_data(conn, address):

    #GLOBAL VARIABLE INIT
    global can_id
    global an1
    global anVar
    global connection_status
    global time_since_valid
    global connected
    global client

    #LOCAL VARIABLES INIT
    connection_status = '\033[91m' + 'DISCONNECTED' + '\033[0m'
    data = b''
    last_data = "START-UP"
    t0 = time.time()

    #CONNECT
    while not connected:
        conn.send([can_id, an1, anVar, connected, time_since_valid])
        try:
            [client, connected] = try_connect(address)
            t1 = time.time()
            time_since_valid = t1 - t0
        except:
            pass

    #GET DATA
    while True:
        try:
            data = client.recv(14)
        except:
            connected = False
        if not connected:
            try:
                [client, connected] = try_connect(address)
                data = client.recv(14)
            except:
                client.close()
                time_since_valid = time.time() - t0
        else:
            if data == b'':
                connected = False
                valid = False
            elif data == b'HELLO NEW SOCK':
                connected = True
                valid = True
            else:
                connected = True
                try:
                    data = data.decode('utf-8').rstrip('\x00')
                except:
                    pass

                [data,valid] = parse_data(data, last_data)
                last_data = data
                data_list = data.split(' ')
                can_id = data_list[0]
                an1 = data_list[1]
                anVar = data_list[2]

            if valid:
                t0 = time.time()
                t1 = time.time()
            else:
                t1 = time.time()

            time_since_valid = t1 - t0
        conn.send([can_id, an1, anVar, connected, time_since_valid])

####DASHBOARD####
def dashboard(conn):
    # fig = go.Figure(go.Indicator(
    #     domain={'x': [0, 1], 'y': [0, 1]},
    #     value=15,
    #     mode="gauge+number",
    #     title={'text': "Windspeed in m/s"},
    #     gauge={'axis': {'range': [None, 60]},
    #            'steps': [
    #                 {'range': [0, 20], 'color': "palegreen"},
    #                 {'range': [20, 40], 'color': "cornsilk"},
    #                 {'range': [40, 60], 'color': "coral"}],
    #            }))
    app = dash.Dash()
    app.layout = html.Div([
        dcc.Interval(
            id="interval-components",
            interval=1*10000,
            n_intervals=0
        ),

        # dcc.Graph(figure=fig)
        daq.Gauge(
            id='gauge-chart',
            color={"gradient": True, "ranges": {
                "green": [0, 10], "yellow": [10, 30], "red": [30, 60]}},
            value=0,
            max=60,
            min=0,
            units="M/S",
        )
    ], className='row', style={'textAlign': 'center'})
    @app.callback(
        output=[Output('gauge-chart', 'value')],
        inputs=[Input('interval-components', 'n_intervals')]
    )
    def update_gauge(pipe=conn):
        packet = pipe.recv()
        value = packet[1]
        return value

    app.run_server(port=8043,debug=True)


def print_pipe(conn):
    while True:
        packet = conn.recv()
        print("\r" + str(packet[1]), end="")

if __name__ == '__main__':

    # SETTINGS
    settings = open('settings.txt', "r")
    for line in settings.readlines():
        line = line.strip("\n")
        line = line.split(" = ")
        if line[0] == "IP":
            ip = socket.gethostbyname(line[1])
        elif line[0] == "PORT":
            port = int(line[1])
        elif line[0] == "VERSION":
            version = str(line[1])
        else:
            print("Unknown setting: " + str(line[0]))

    address = (ip, port)
    print(address)

    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=get_data, args=(parent_conn, address,))
    p2 = multiprocessing.Process(target=dashboard, args=(child_conn, ))
    p3 = multiprocessing.Process(target=print_pipe, args=(child_conn, ))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()