Subscribe and Display MQTT in Dash

Hi All, I am building an application to publish and subscribe to MQTT topic, Though I am able to publish to the topic, I cannot subscribe and display the response receive in a particular topic. Below is my code:

import dash
import dash_html_components as html
import dash_core_components as dcc
import dash_bootstrap_components as dbc
import flask
from dash.dependencies import Input, Output, State
import json
import paho.mqtt.client as mqtt

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

server = flask.Flask(__name__)
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.config.suppress_callback_exceptions = True

app.layout = html.Div([
                                         html.Div([
                                                    html.Button('Register', id='post-val', n_clicks=0)
                                                    
                                                ],style = {'marginTop': 50, 'margin-left' : '600px'}),
                                        
                                        html.Br(),
                                        
                                        html.Div(
                                            
                                            id='textarea-state-example-output', style={'whiteSpace': 'pre-line'}
                                            ),
                                        html.Div(
                                            
                                            id='textarea-state-example-output1', style={'whiteSpace': 'pre-line'}
                                            ),


                                 ],style={'width':'100%', 'margin': 20})
 
MQTT_HOST = "172.17.0.2"
#MQTT_HOST = 'localhost'
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 45
MQTT_TOPIC_Registration = "gw/register"
MQTT_TOPIC_Registration_Response = "gw/registerresponse"
mqttc = mqtt.Client()
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)

@app.callback(
    Output('textarea-state-example-output1', 'children'),
    [Input('post-val', 'n_clicks')]
    )


def display_Output(clicked):
    
    if clicked:
        def on_message(client, userdata, msg):
            global p
            r= msg.topic
            p= msg.payload
            return p
        return on_message()


@app.callback(
    Output('textarea-state-example-output', 'children'),
    [Input('post-val', 'n_clicks')])

def update_output(clicked):
    
    if clicked:

    MQTT_MSG=json.dumps({"gw_macid": "[32,74,2,255,255,144,255,203]",
                         "zigbee_mac_id": "60A423FFFE4292F1",
                         "gw_ip" : "192.168.0.105",
                         "gw_fw_version": "19.4.h-597",
                         "status_report_interval": 1,
                         "net_config": 2, 
                         "network_gateway_ip": 171986689,
                         "device_net_mask": 4294967040 });  
        
        mqttc.publish(MQTT_TOPIC_Registration, MQTT_MSG)
        
        res = mqttc.subscribe(MQTT_TOPIC_Registration_Response)
        if len(res)> 0:
            return "Gateway with MAC id" +gw_mac_id +"registered successfully"
        return "Gateway cannot be registered :("

if __name__ == '__main__':
    app.run_server(host='0.0.0.0',port=8050,debug=True)

I am receiving the response because variable “res” is not equal to zero, but unfortunately I am not able to print the output for the response topic. The error is as follows,

TypeError: on_message() missing 3 required positional arguments: 'client', 'userdata', and 'msg'

Help is appreciated. Thank you.

Hi @Anup1

You have a function:

def on_message(client, userdata, msg):

I can’t find where is the line that call that function an pass the 3 variables defined. :thinking:

The error message is saying that this 3 variables content are missing. :woozy_face:

Hi @Eduardo , You are right, basically the function should be called like this:

def display_Output(clicked):
    
    if clicked:
        def on_message(client, userdata, msg):
            global p
            r= msg.topic
            p= msg.payload
            return p
        mqttc.on_message = on_message

This call passes all the required arguments, but if I do that it does not print anything. I am not able to figure out how to modify this function to display the received message.

But where is the data to be used in the function?
I do not see this variables in any other part of the code.
Sorry, I don’t understand how this function works?
:thinking:

“on_message” is a call back function from the Paho.Mqtt (Ref link). I am sorry for not explaining the problem clearly, consider the below script which works fine in publishing and subscribing the message from MQTT topics.

import paho.mqtt.client as mqtt
import json

# Define Variables

Regisration_flag = 0
#MQTT_HOST = "172.17.0.2"
MQTT_HOST = "localhost"
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 45
MQTT_TOPIC_Registration = "gw/register"
MQTT_TOPIC_Registration_Response = "gw/registerresponse"

# Define on_publish event function
def on_publish(client, userdata, mid):
    print ("Message Published...")

def on_connect(client, userdata, flags, rc):
    client.subscribe(MQTT_TOPIC_Registration_Response)
    MQTT_MSG=json.dumps({"gw_macid": "[32,74,2,255,255,144,255,203]",
                         "zigbee_mac_id": "60A423FFFE4292F1",
                         "gw_ip" : "192.168.0.105",
                         "gw_fw_version": "19.4.h-597",
                         "status_report_interval": 1,
                         "net_config": 2, 
                         "network_gateway_ip": 171986689,
                         "device_net_mask": 4294967040 });  
    
    client.publish(MQTT_TOPIC_Registration, MQTT_MSG)
    #client.subscribe(MQTT_TOPIC_Registration_Response)


def on_message(client, userdata, msg):
    print(msg.topic)
    print(msg.payload) 
    #client.disconnect() # Got message then disconnect

# Initiate MQTT Client
mqttc = mqtt.Client()

# Register publish callback function

if Regisration_flag == 0:
    mqttc.on_publish = on_publish
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message
    Registration_flag = 1

# Connect with MQTT Broker
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)

# Loop forever
mqttc.loop_forever()

Unfortunately, I am not able to use the same “on_message” function to display the response in the Dash.

Hey @Anup1

In this example you are using the line:

mqttc.on_message = on_message

That line executes the function on_message, but in the Dash callback there is no such line. Then you are returning a function that has never been called :thinking:

@Eduardo Exactly you are right, I had written the function wrong, but I have updated it now. Below is code:

def display_Output(clicked):
    
    MQTT_HOST = "172.17.0.2"
    #MQTT_HOST = "localhost"
    MQTT_PORT = 1883
    MQTT_KEEPALIVE_INTERVAL = 45
    MQTT_TOPIC_Registration_Response = "gw/D4351D552DA8/registerresponse"
    
    if clicked:
        def on_connect(client, userdata, flags, rc):
            client.subscribe(MQTT_TOPIC_Registration_Response)
            
            
        def on_message(client, userdata, msg):
           
            data1 = json.loads(str(msg.payload.decode("utf-8","ignore")))
            print(data1)
            return data1
            
        
        mqttc = mqtt.Client()
        mqttc.on_connect = on_connect
        mqttc.on_message = on_message
        rec = mqttc.on_message
        mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)
        mqttc.loop_start()
        return str(rec)

Now the display shows this

<function display_Output.<locals>.on_message at 0x7f1c0b81faf0>

I know I am wrong somewhere but I dont know where :frowning:

Sorry, I have no experience in using MQTT :woozy_face:
Don’t you need to add a loop (do while) function that is triggered width mqttc.loop_start() :thinking:

@Eduardo Thank you very much for the support :slight_smile: , Upon adding the same question in Stackoverflow I got to know that on_message does not return anything and MQTT call back functions should be handled by using websockets. So handling MQTT in Plotly Dash isnt straight forward as writing a script.

How fast do you need updates?

If delay of ~1s is acceptable, you could just make an on_message handler that writes the mqtt message(s) to a file on disk, and then read this file from a callback triggered by an interval component.

If you need faster updates though, I agree that a websocket would be a more appropriate solution.

@Emil Oh the accepted latency is in milliseconds so, I am trying to build the application using websockets.

Then you definitely need websockets, yes. The easiest way is probably via dash-extensions,

Could you elaborate on why do you need millisecond delay on a GUI? A human can even react that fast?

aegis1980/dash-mqtt: MQTT with Dash Plotly (github.com)

2 Likes

Thanks for the Git hub resource, it helps!!! And the messages I receive from MQTT is further sent to trigger certain Machine-to-Machine communications which cannot afford delays, I need to display those messages on GUI as pop ups like alerts. Do you recommend any other optimized way I can achieve this ?

Basic requirement: I need to build dashboard in Python using plotly and process received MQTT messages.

@Sylvos ah here is the Gold!!! For a person whose new to building a dashboard for MQTT this is real gold. Thanks a lot.

@Sylvos Okay So I tried with mqtt-dash but face new problems, Current I am running my eclipse mqtt broker in a docker which basically is associated with the ip 172.17.0.2 and port 1883. But I am not able to connect to the broker with these parameters. Could you please help?

TEST_SERVER = '172.17.0.2'
TEST_SERVER_PORT = 1883
TEST_SERVER_PATH = 'mqtt'
MESSAGE_OUT_TOPIC = 'testtopic'
MESSAGE_IN_TOPIC = 'testtopic'

Could you please tell what’s “TEST_SERVER” and “TEST_SERVER_PATH”?

@Sylvos Thank for the link, Finally I was able to subscribe and publish to a MQTT topic with the help of the Github repo mentioned in your comment. Here is the final working script:

app.py

import dash
import dash_html_components as html
import dash_core_components as dcc
import dash_bootstrap_components as dbc
import flask
from dash.dependencies import Input, Output, State
import dash_mqtt


external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

server = flask.Flask(__name__)
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.config.suppress_callback_exceptions = True


TEST_SERVER = 'localhost'
TEST_SERVER_PORT = 1884
MQTT_TOPIC_Registration = 'posttopic'

MQTT_TOPIC_Registration_Response = '#'

app.layout = html.Div([

                        
                        dcc.Tabs([
                           dcc.Tab(id = 'tab1', label='Registration', children = [
                                html.Div([ 
                                    dash_mqtt.DashMqtt(
                                                            id='mqtt',
                                                            broker_url=TEST_SERVER,
                                                            broker_port = TEST_SERVER_PORT,
                                                            #broker_path = TEST_SERVER_PATH,
                                                            topics=[MQTT_TOPIC_Registration, MQTT_TOPIC_Registration_Response]
                                                            ),
 
                           
                                        html.Div([
                                                    html.Button('Register', id='post-val', n_clicks=0)
                                                    
                                                ],style = {'marginTop': 50, 'margin-left' : '600px'}),

                                 html.Div([
                                            dcc.ConfirmDialog(
                                                                id='confirm',
                                                                message='Are you sure you want to continue ?',
                                                            )]),
                          
                                                      

                          
                                ],style={'width':'100%', 'margin': 20})

                            ]),
                            
                            
                            dcc.Tab(label='TAB2', children = [
                                       html.Div([
                                            
                                              html.Div(
                                            
                                            id='textarea-state-example-output', style={'whiteSpace': 'pre-line'}
                                            ),
                                            
                                            ])
                                
                                ])                            

    ]),
    html.Div(id='tabs-content')
    
])


@app.callback(
    Output('confirm', 'displayed'),
    [Input('post-val', 'n_clicks')]
    
    )


def display_confirm(clicked):
    
    if clicked:
        return True
    return False


   
@app.callback(
    Output('textarea-state-example-output','children'),
    [Input('mqtt', 'incoming')]
    )

def update_table(incoming_message):
    if (incoming_message):
        
       return incoming_message['payload']
        
    else:
        return dash.no_update

if __name__ == '__main__':
    app.run_server(host='0.0.0.0',port=8050,debug=True)

Finally @Emil and @Eduardo Thank for your support as well. :slight_smile:

Regards

1 Like