Black Lives Matter. Please consider donating to Black Girls Code today.

Multithreading problem with parallel tasks: streaming data and pass it into dash app

Hi

I have an app whith two modules.

module 1: app.py
module 2: streamer.py

The streamer.py streams tweets from twitter and app.py take some specific numbers from streamer.py and display it in a dash gauge chart. The streamer.py works fine.

The problem is, that I need two sessions in parallel which interacts with each other, but when the streamer.py starts streaming, the app.run_server(debug=True) doesn’t start (run ode after flask application has started). I did not figure it out how to manage that in dash.

I have read that Dash is thread based and therefore parallel streaming of content to the Dash application is not possible.

import twitter_streamer

hash_tag_list = ["trump"]
streamer = twitter_streamer.TwitterStreamer()
streamer.stream_tweets(hash_tag_list)

... SOME CODE...

if __name__ == '__main__':
    app.run_server(debug=True)

–> Result: streamer.py works and show me some tweets, but app.py did not run

This does not work neighter because of threading:

  import twitter_streamer
    ... SOME CODE...

    if __name__ == '__main__':
        hash_tag_list = ["trump"]
        streamer = twitter_streamer.TwitterStreamer()
        streamer.stream_tweets(hash_tag_list)
        app.run_server(debug=True)

–> Result: streamer.py works and show me some tweets, but app.py did not run

Is there really no way to stream data in parallel to the Dash app?

1 Like

I think what is meant in that post is that starting a Dash app will result in the Flask server blocking the rest of your script, as it’s now listening for requests, not so much that Dash itself is threaded. You can run the Flask server that powers Dash with multiple worker process and/or with multiple threads, however using multiple worker processes is often what you want as Python threads only give you concurrency for I/O bound tasks.

I’d suggest that if you want to combine your Dash app into the same Python program, then you start the server in a new process using Python’s multiprocessing module, and just continue defining the rest of your logic that runs in the parent process. You can see an example of this in the Dash tests:

Thank you @nedned. Multiprocessing is definitely new to me. I am not sure which is the best way to implement that and tried it first with a simple program.

count.py (as main code) calls the app.py (Dash) module or reverse? I would say count.py (as main code) calls the app.py (Dash) module because the main code is in count.py and additionally a separated process (the Dash app) should be established in a second process.

Therefore:

count.py

import app

counter = 0

while True:
    counter += 1
    time.sleep(1)
    print(counter)

app.py

import dash
import dash_table
import dash_core_components as dcc
import dash_html_components as html
import dash_daq as daq
from dash.dependencies import Input, Output

import multiprocessing
import time

app = dash.Dash(__name__)

app.layout = html.Div([

    dcc.Interval(
        id='interval-component',
        interval=1*10000,  # in milliseconds
        n_intervals=0
    ),

    html.Div([
        daq.Gauge(
            id='gauge-chart',
            value=2,
            max=100,
            min=0,
            units="MPH",
        )
    ])
])


@app.callback(
    Output('gauge-chart', 'value'),
    [Input('interval-component', 'n_intervals')]
)
def update_gauge(n_intervals):
    value = 50
    return value


def startServer(self, dash):
    def run():
        dash.scripts.config.serve_locally = True
        dash.run_server(
            port=8050,
            debug=False,
            processes=4,
            threaded=False
        )

    # Run on a separate process so that it doesn't block
    self.server_process = multiprocessing.Process(target=run)
    self.server_process.start()
    time.sleep(0.5)

    # Visit the dash page
    self.driver.get('http://localhost:8050')
    time.sleep(0.5)


startServer(app)

But how to call (or to pass the app) the startServer function? This startServer(app) does not work :slight_smile:

In your startServer function, you’ve kept a lot of the code that was relevant to the testing context, so you can clean it up a bit (also don’t need the self as that is only relevant inside a class definition):

def start_server(app, **kwargs):
    def run():
        app.run_server(**kwargs)

    # Run on a separate process so that it doesn't block
    server_process = multiprocessing.Process(target=run)
    server_process.start()

One way you could organise this is to have start_server defined in a helper module say utils.py, then your main.py could look like this:

from utils import start_app
from app import app

start_server(app)

# then the rest of your twitter logic could go here
#...

You would start the app with:

$ python main.py
1 Like

How do we make this work with gunicorn? Because gunicorn won’t know about our start_server function, right?

If you’re using a wsgi server then it could be worth thinking about splitting out your Dash app and the other app you want to run in parallel into two separate services and have them communicate with something like celery, or have your other app write data to a key value store like redis, which your Dash app listens to. Once you start using gunicorn, squeezing everything into the one parent process could introduce more problems than it’s worth. For example, telling gunicorn to run with multiple worker processes will result in your app spawning a child processes doing the data processing for each worker process.

2 Likes

I’ve tried following code as you recommended. It is just a trivial counter but on multiple processes displayed by Dash app.

main.py

from utils import start_server
from app import app
import time

start_server(app)

counter = 0

while True:
    counter += 1
    time.sleep(1)
    print(counter)

utils.py

import multiprocessing

def start_server(app, **kwargs):
    def run():
        app.run_server(**kwargs)

    # Run on a separate process so that it doesn't block
    server_process = multiprocessing.Process(target=run)
    server_process.start()

app.py

import dash
import dash_table
import dash_core_components as dcc
import dash_html_components as html
import dash_daq as daq
from dash.dependencies import Input, Output

import time

app = dash.Dash(__name__)

app.layout = html.Div([

    dcc.Interval(
        id='interval-component',
        interval=1*10000,  # in milliseconds
        n_intervals=0
    ),

    html.Div([
        daq.Gauge(
            id='gauge-chart',
            value=2,
            max=100,
            min=0,
            units="MPH",
        )
    ])
])


@app.callback(
    Output('gauge-chart', 'value'),
    [Input('interval-component', 'n_intervals')]
)
def update_gauge(n_intervals):
    value = 50
    return value

Problem:
AttributeError: Can't pickle local object 'start_server.<locals>.run'
–> I am on a Win machine and it looks that python multiprocessing is differnet on Win then on Linux or Mac --> RuntimeError on windows trying python multiprocessing

Add if __name__ == "__main__": to main.py does not help:

from utils import start_server
from app import app
import time

start_server(app)

counter = 0

while True:
    counter += 1
    time.sleep(1)
    print(counter)

if __name__ == "__main__":
    pass

Hi @IFE,

Have you found a solution for your problem?

@nedned In general, is there a tutorial how to setup a program with a thread/process for dash and another one for example a sensor, which processes data of a sensor which shall be plotted be the dash thread? How would you make this communication?

Thanks and best regards
Jonas

For this kind of problem, I would use a setup with 3 docker containers,

  1. A container running a cache, e.g. a Redis server
  2. A container running the script that collects the data from the sensor and inserts them into the cache
  3. A container running the Dash app via gunicorn, which reads the data from the cache

Using docker-compose the container setup can be easily managed. For communication between the containers, I would setup a docker network.

2 Likes

Hi @Emil,

Thanks for your inspiration, that sounds really nice - and I will try to set it up in the next weeks.

However, do you think that there is a more basic way to face this topic? As my starting point is an application based on threads with one thread containing a Matplotlib live-plot. Now the big issue, which I face is that matplotlib gets too slow, which is the reason I need to change now in this given framework the plotting. Or is the mentioned approach the only one which makes sense?

@Emil I second @jweiss15’s question: is there an easier way to handle ingesting data into dash from a separately running concurrent thread? Is there a tutorial on this or at least a working example? (This one gets close: Getting live Data from UDP socket)

What is the issue that you are experiencing with the proposed approach? Both Docker, Redis and Docker-Compose are well-known and well-documented pieces of sofware. And since everything is containerized, it should be simple to setup (no tools or configuration are required on the host system except for a standard docker-compose installation) too. I don’t know if a tutorial is available, but for now i have created a small example app that simply pulls data from a (dummy) sensor and draws a bar chart,

import dash
import dash_core_components as dcc
import dash_html_components as html
import redis

from dash.dependencies import Output, Input
from flask import Flask

# Create app.
server = Flask(__name__)
app = dash.Dash(server=server)
app.layout = html.Div([dcc.Graph(id="result"), dcc.Interval(id="poller", interval=1000)])


# Create callbacks.
@app.callback(Output("result", "figure"), [Input("poller", "n_intervals")])
def poll_result(n_intervals):
    # Connect to redis.
    r = redis.Redis(host='myapp-redis', port=6379, db=0)
    # Get sensor value.
    sensor_value = r.get("mysensor").decode("utf-8")
    # Create graph.
    return dict(data=[dict(x=[0], y=[int(sensor_value)], type="bar")],
                layout=dict(yaxis=dict(range=[0, 10]), title="Sensor value"))


if __name__ == '__main__':
    app.run_server()

The most complex part will probably be to pull the data from the sensor, which is a case-specific task. In this simple example, i emulate the sensor via a dummy scipt, which inserts random numbers into the Redis cache,

import random
import redis
import time

# Connect to redis.
r = redis.Redis(host='myapp-redis', port=6379, db=0)
# Sample a new value at random times between 100 ms and 2s
while True:
    time.sleep(float(random.randint(100, 2000))/1000)
    r.set("mysensor", random.randint(1, 10))

The complete code is available here,

1 Like

Hi Emil,

Wow! Awesome, thank you very much for providing the example, this helped me a lot. Year the issue was that I just started with the Dash framework and haven’t worked so far with Docker/Redis - so I felt like a huge overkill for me. However with your help I could start much easier into the topic, and have now a first prototype running.

Maybe your post can be linked somewhere at the Plotly/Dash website as I think it will help many people…!

Best regards
Jonas

1 Like

Hello, May I ask, if using docker or Redis to achieve the parallel task to receive data and pass it into dash, negates the possibility to creating a single exe file on a local computer.
Thanks