🚀 Flash 1.2.0 - streaming UI updates via Server Sent Events

Flash 1.2.0 Update: Upgraded to Dash 3.2.0

Hello everyone,

I am pleased to announce the release of Flash 1.2.0, which incorporates the latest Dash version (3.2.0). Below is an overview of the key enhancements:

  • Async Support for Background Callbacks: Background callbacks can now be implemented asynchronously (from Dash update)

  • New Event Callbacks: This feature enables real-time streaming of UI updates and supports endless streams.

Understanding Event Callbacks

Event callbacks are built on Server-Sent Events (SSE), which provide unidirectional HTTP-based byte streams. Unlike WebSockets, SSE focuses solely on server-to-client data transmission, maintaining an open connection for the duration required. This approach is ideal for applications such as live dashboards, push notifications, and incremental UI updates during callback processing.

The API resembles that of standard callbacks, with the notable absence of an explicit Output; this is managed through the stream_props function. A key requirement is that the callback function must be an async generator, utilizing yield statements rather than returns.

In addition to these benefits, event callbacks are particularly effective in environments with unstable internet connections. The SSE endpoint sustains the connection post-initialization, eliminating the need for repeated requests as seen in polling methods. Consequently, for scenarios not involving highly intensive CPU-bound tasks, background callbacks and their associated resources may be unnecessary. Furthermore, event callbacks integrate seamlessly with Celery, allowing task submission and real-time streaming of results from the worker queue to the client.

For a comprehensive explanation, please refer to the README

Examples

  • The first is adapted from the background callback documentation.

  • The second demonstrates an endless stream.

Looking forward to your feedback and suggestions to further refine everything!

I am also going to release a Dash compatible plugin - which will be sync and don’t stream endless.

Progressive UI updates

from flash import stream_props, event_callback, Input
import dash_mantine_components as dmc

@event_callback(
        Input(ids.start_btn, "n_clicks"),
        cancel=[(Input(ids.cancel_btn, "n_clicks"), 0)],
        reset_props=[
            (ids.table, {"rowData": [], "columnDefs": []}),
            (ids.start_btn, {"children": "Download Data", "loading": False}),
            (ids.cancel_btn, {"display": "none"}),
        ]
    )
    async def update_table(n_clicks):

        yield stream_props([
            (StreamTable.ids.start_btn, {"loading": True}),
            (StreamTable.ids.cancel_btn, {"display": "flex"}),
        ])

        yield NotificationsContainer.send_notification(
            title="Starting Download!",
            message="Notifications in Dash, Awesome!",
            color="lime",
        )

        progress = 0
        chunck_size = 500
        async for data_chunk, colnames in get_data(chunck_size):
            if progress == 0:
                columnDefs = [{"field": col} for col in colnames]
                update = {"rowData": data_chunk, "columnDefs": columnDefs}
            else:
                update = {"rowTransaction": {"add": data_chunk}}

            yield stream_props(StreamTable.ids.table, update)

            if len(data_chunk) == chunck_size:
                yield NotificationsContainer.send_notification(
                    title="Progress",
                    message=f"Processed {chunck_size + (chunck_size * progress)} items",
                    color="violet",
                )

            progress += 1

        yield stream_props([
            (StreamTable.ids.start_btn, {"loading": False, "children": "Reload"}),
            (StreamTable.ids.cancel_btn, {"display": "none"}),
        ])

        yield NotificationsContainer.send_notification(
            title="Finished Callback!",
            message="Notifications in Dash, Awesome!",
            icon=DashIconify(icon="akar-icons:circle-check"),
            color="lime",
        )

Unbenannt (7)

Cancel stream and reset props

cancel=[(Input(ids.cancel_btn, "n_clicks"), 0)]

reset_props=[
    (ids.table, {"rowData": [], "columnDefs": []}),
    (ids.start_btn, {"children": "Download Data", "loading": False}),
    (ids.cancel_btn, {"display": "none"}),
]

Unbenannt (8)

Endless Stream

@event_callback(
        Input(StreamButtons.ids.start_btn, "n_clicks"),
        cancel=[
            (Input(RootContainer.ids.location, "pathname"), "/streaming/live-dashboard"),
            (Input(StreamButtons.ids.end_btn, "n_clicks"), 0),
        ],
        reset_props=[
            (StreamButtons.ids.start_btn, {"disabled": False, "children": "Start stream"}),
            (StreamButtons.ids.end_btn, {"display": "none"}),
        ],
        on_error=lambda e: NotificationsContainer.send_notification(
            title="Error", message=str(e), color="red"
        )
    )
    async def update_graph(n_clicks):

        yield NotificationsContainer.send_notification(
            title="Starting stream!",
            message="Notifications in Dash, Awesome!",
            color="lime",
        )

        yield stream_props([
            (StreamButtons.ids.start_btn, {"disabled": True, "children": "Running"}),
            (StreamButtons.ids.end_btn, {"display": "flex"}),
        ])

        while True:
            await asyncio.sleep(0.5)
            stock_ticks = ["google", "apple", "microsoft", "amazon"]
            stocks = await asyncio.gather(*[get_stocks() for _ in stock_ticks])
            update = []
            for tick, value in zip(stock_ticks, stocks):
                update.append((SSEGraph.ids.graph(tick), {"extendData": value}))

            yield stream_props(update)

Unbenannt (9)

7 Likes

I thought I provide a little background on why created this project and some benchmarks which showcase when to use a sync or async backend.

The Dash apps I develop mainly talk to dbs and other services in the network and most of the heavy lifting is in the data pipelines and ml platform. So I pretty quickly hit a wall where I would have to scale the number of containers to a unreasonable amount given the number of users and most of the callbacks would basically just wait 99% of the time. But I didn’t want to drop Dash because I really like the API, it is super easy to onboard Analysts and Data Scientists to your project and the intersection between prototyping in notebooks and your application is just great. So I decided to swap Flask with Quart and the results where pretty nice. A single instance was able to serve around 50x more users per instance and could move to a super simple 3 container single core setup.

I created 2 synthetic scenarios to showcase when to use an sync or async backend. Also one scenario showing the impact of event_callbacks.

Deployment Setup:

Each app runs in its own Docker container

[Dash](https://github.com/chgiesse/dash-router-examples/blob/main/app/dash_app.py\):

ENTRYPOINT ["python", "-m", "gunicorn", "–bind", "0.0.0.0:8051", "–workers", "2", "dash_app:server"]

[Flash](https://github.com/chgiesse/dash-router-examples/blob/main/app/app.py\):

ENTRYPOINT ["python", "-m", "uvicorn", "app:server", "--host", "0.0.0.0", "--port", "8050", "--workers", "2"]

Pen testing setup:

I use the hey cli tool to mock requests GitHub - rakyll/hey: HTTP load generator, ApacheBench (ab) replacement and run the pen testing. -n is the number of total request that get send and -c the amount of concurrent requests/users.

  • CPU bound request setup:
hey -n 200 -c 20 -o csv http://localhost:8081/performance-check-1 > benchmarks
/cpu_bound_dash.csv
  • Network bound:
hey -n 100 -c 10 -o csv http://localhost:8081/network-heavy-check > benchmarks/nhv_dash.csv

Now, lets look at some lovely Plotly Charts!

CPU bound tasks comparison:

And as we can see we can actually don’t see a big difference. The difference is most likely due to the added randomness in the simulation. Dash/Flask could have been faster in the next round.

But now, lets extend the simulation while running a single event_callback during the tests.

That’s where it is getting interesting, while the async version stays pretty much the same Flask drops exactly 1 rps and has 2s longer request times for **one** tab running the event callback! That’s why the Dash plugin doesn’t support endless streams. I could open 6 tabs with that stream and the whole container instance would be blocked. But, there is a difference that should be part of a design/decision process. Maybe the async version had “better” performance in terms of raw numbers, but Flask sends exactly responses as they become ready (every second), because the worker would have access to its resources at every moment in time while Flash responses came around every three seconds, thats when the event loop processed the stream further.

Lets look at the network i/o scenario - thats where it gets kind of crazy. As you can see above, I had to reduce the number of concurrent requests to 10 because Flask would drop/timeout requests. The mock io tasks are a random choice between 1.5 and 3.5 seconds.

Here you can see where the async backend shines, io bound tasks don’t block each other compared to Flask.

So the conclusion here would that if your app sole relies on files on the server, you don’t get any benefit but the moment you have io async gets way more efficient. Lets increase the workload.

hey -n 5000 -c 600 http://localhost:8080/network-heavy-check

And I think its pretty awesome with 105 req/s and peaks at 160.
Let me know what you think and how your Dash application looks like!

Looking forward to your feedback and happy coding!

4 Likes

Very interesting and definitely useful change, however I think it does not work together with request_pathname_prefix.

When I create an event_callback and trigger it, I can see a request in DevTools → Network from dash_update_component_sse, which returns a 404. Reason is, that it looks for dash_update_component_see in the top-level domain, not respecting request_pathname_prefix.

E.g., my app is hosted at http://test.app/project/subfolder:

Can this be configured or is this scenario simply not foreseen right now? As I am working solely on a cloud devserver, I unfortunately cannot even test the new function with request_pathname_prefix.

Edit: The issue is in flash/_event_callback.py line 16. However, I never looked into Dash deep enough to understand how to get the prefix in here.

SSE_CALLBACK_ENDPOINT: _t.Final[str] = "/dash_update_component_sse"

Hi @mgg1!

Thats true, thanks for pointing out! I’ll provide a patch eod.

@mgg1 It should be fixed, just install v1.2.1. Let me know if it works :slight_smile:

Thanks for the very fast fix.

I had a look again and I still cant get the event callback to work. I did the minimal working example below (on my infrastructure). App starts as normal, async-SSE.js is also loaded in the network tab in DevTools, but when I click “start stream” nothing happens, the callback is not triggered. Devtools is also not showing any reaction.

app.py
from flash import Input, event_callback, stream_props
from utils.risklab import build_root_path, getPort, setProd, setPort
from flash import Flash
import dash_mantine_components as dmc
import pandas as pd
import asyncio
import plotly
import dash_ag_grid as dag

setPort(8058)
setProd(False)

app = Flash(
    __name__,
    update_title="Updating...",
    requests_pathname_prefix=build_root_path(port=getPort()),
    routes_pathname_prefix="/",    
)

app.layout = dmc.MantineProvider(
    [
        dmc.Button("Start stream", id="start-stream-button"),
        dmc.Button("Cancel stream", id="cancel-stream-button", display="None"),
        dag.AgGrid(id="dash-ag-grid")
    ]
)


async def get_data(chunk_size: int):
    df: pd.DataFrame = plotly.data.gapminder()
    total_rows = df.shape[0]

    while total_rows > 0:
        await asyncio.sleep(2)
        end = len(df) - total_rows + chunk_size
        total_rows -= chunk_size
        update_data = df[:end].to_dict("records") # type: ignore
        df.drop(df.index[:end], inplace=True) # type: ignore
        yield update_data, df.columns



@event_callback(Input("start-stream-button", "n_clicks"))
async def update_table(_):

    yield stream_props([
        ("start-stream-button", {"loading": True}),
        ("cancel-stream-button", {"display": "flex"})
    ])

    progress = 0
    chunk_size = 500
    async for data_chunk, colnames in get_data(chunk_size):
        if progress == 0:
            columnDefs = [{"field": col} for col in colnames]
            update = {"rowData": data_chunk, "columnDefs": columnDefs}
        else:
            update = {"rowTransaction": {"add": data_chunk}}

        yield stream_props("dash-ag-grid", update)

        progress += 1

    yield stream_props("start-stream-button", {"loading": False, "children": "Reload"})
    yield stream_props("reset-strea-button", {"display": "none"})


if __name__ == "__main__":
    app.run(port=getPort(), host="0.0.0.0", debug=True, dev_tools_hot_reload=False)

I then checked the definition of even_callback and added a debug print in the very first line of the decorator (even before if not inspect.isasyncgenfunction(func)

  • When I take out all yields from async def update_table , starting my app first prints by debug statement and then the ValueError
  • If I make the def a generator function again (i.e., bring back yield), not even the debug statement is printed out anymore when I start the app.

I have no good explanation for this, but maybe this is helpful to you.

Update: Another issue I found is, that as soon as I include any event_callback in an existing app, Dash persistence stops working completely.

Hey @mgg1, sure thing! But doesn’t seam to be a fix haha sorry for the circumstances, I messed up the setup hook.

Lets try again, pip install dash-flash=1.2.2 should solve it now!

The event is set-up now and yield stream-props also return an ServerSideEvent at time it gets called in the event. E.g. when adding a debug statement just before the return:

event = ServerSentEvent(json.dumps(response))
print("event", event) # e.g., e.g. ServerSentEvent(data=‘[“[SINGLE]”, “start-stream-button”, {“loading”: true}]’, event=None, id=None, retry=None)
return event.encode()

However, all updates in Dash are only done after the callback finished, so behavior is identical to using set_props in a normal callback.

Am I still missing something in the code below? I ran the app with “python app.py” and “hypercorn app:server”.

#app.py
from urllib import response
from flash import Input, Output, State, event_callback, stream_props, callback
from dash.exceptions import PreventUpdate
from utils.decorators import callback_handler
from utils.risklab import build_root_path, getPort, setProd, setPort
from flash import Flash
import dash_mantine_components as dmc
import pandas as pd
import asyncio
import plotly
import dash_ag_grid as dag
from quart import Response
from dash.dcc import Store

setPort(8000)
setProd(False)

app = Flash(
    __name__,
    update_title="Updating...",
    requests_pathname_prefix=build_root_path(port=getPort()),
    routes_pathname_prefix="/"
)

app.layout = dmc.MantineProvider(
    [
        dmc.Button("Start stream", id="start-stream-button"),
        dmc.Button("Cancel stream", id="cancel-stream-button", display="none"),
        # dag.AgGrid(id="dash-ag-grid"),
        dmc.Text(id="response", children="here we go")
    ]
)


@event_callback(
    Input("start-stream-button", "n_clicks")
)
async def stream(n_clicks):
    yield stream_props("response", {"children": "Test1"})
    yield stream_props("start-stream-button", {"loading": True})
    yield stream_props("response", {"children": "Test2"})
    await asyncio.sleep(30)
    yield stream_props("start-stream-button", {"loading": False})


server = app.server
if __name__ == "__main__":
    app.run(port=getPort(), host="0.0.0.0", debug=True, dev_tools_hot_reload=False)

I might be wondering, if streaming is simply not working in my environment, as the SSE component from Dash Extensions does the same. It works, but updates are done not “live” but only after callback finished.

Hi @mgg1 ,

nice and not so nice. I would expect that this is caused because your server is behind a proxy that buffers your responses. I found this post https://serverfault.com/questions/801628/for-server-sent-events-sse-what-nginx-proxy-configuration-is-appropriate which describes this.

Can you try setting this

response.headers.update(
                {
                    "Content-Type": "text/event-stream",
                    "Cache-Control": "no-cache",
                    "Transfer-Encoding": "chunked",
                    "X-Accel-Buffering": "no" <- this is the important header
                }
            )

in the last Flash method that handles the event stream?

@mgg1 where you able to check the header? I would add a disable_proxy_buffering parameter or something like this.