Flash 1.2.0 Update: Upgraded to Dash 3.2.0
Hello everyone,
I am pleased to announce the release of Flash 1.2.0, which incorporates the latest Dash version (3.2.0). Below is an overview of the key enhancements:
-
Async Support for Background Callbacks: Background callbacks can now be implemented asynchronously (from Dash update)
-
New Event Callbacks: This feature enables real-time streaming of UI updates and supports endless streams.
Understanding Event Callbacks
Event callbacks are built on Server-Sent Events (SSE), which provide unidirectional HTTP-based byte streams. Unlike WebSockets, SSE focuses solely on server-to-client data transmission, maintaining an open connection for the duration required. This approach is ideal for applications such as live dashboards, push notifications, and incremental UI updates during callback processing.
The API resembles that of standard callbacks, with the notable absence of an explicit Output; this is managed through the stream_props
function. A key requirement is that the callback function must be an async generator, utilizing yield statements rather than returns.
In addition to these benefits, event callbacks are particularly effective in environments with unstable internet connections. The SSE endpoint sustains the connection post-initialization, eliminating the need for repeated requests as seen in polling methods. Consequently, for scenarios not involving highly intensive CPU-bound tasks, background callbacks and their associated resources may be unnecessary. Furthermore, event callbacks integrate seamlessly with Celery, allowing task submission and real-time streaming of results from the worker queue to the client.
For a comprehensive explanation, please refer to the README
Examples
-
The first is adapted from the background callback documentation.
-
The second demonstrates an endless stream.
Looking forward to your feedback and suggestions to further refine everything!
I am also going to release a Dash compatible plugin - which will be sync and don’t stream endless.
Progressive UI updates
from flash import stream_props, event_callback, Input
import dash_mantine_components as dmc
@event_callback(
Input(ids.start_btn, "n_clicks"),
cancel=[(Input(ids.cancel_btn, "n_clicks"), 0)],
reset_props=[
(ids.table, {"rowData": [], "columnDefs": []}),
(ids.start_btn, {"children": "Download Data", "loading": False}),
(ids.cancel_btn, {"display": "none"}),
]
)
async def update_table(n_clicks):
yield stream_props([
(StreamTable.ids.start_btn, {"loading": True}),
(StreamTable.ids.cancel_btn, {"display": "flex"}),
])
yield NotificationsContainer.send_notification(
title="Starting Download!",
message="Notifications in Dash, Awesome!",
color="lime",
)
progress = 0
chunck_size = 500
async for data_chunk, colnames in get_data(chunck_size):
if progress == 0:
columnDefs = [{"field": col} for col in colnames]
update = {"rowData": data_chunk, "columnDefs": columnDefs}
else:
update = {"rowTransaction": {"add": data_chunk}}
yield stream_props(StreamTable.ids.table, update)
if len(data_chunk) == chunck_size:
yield NotificationsContainer.send_notification(
title="Progress",
message=f"Processed {chunck_size + (chunck_size * progress)} items",
color="violet",
)
progress += 1
yield stream_props([
(StreamTable.ids.start_btn, {"loading": False, "children": "Reload"}),
(StreamTable.ids.cancel_btn, {"display": "none"}),
])
yield NotificationsContainer.send_notification(
title="Finished Callback!",
message="Notifications in Dash, Awesome!",
icon=DashIconify(icon="akar-icons:circle-check"),
color="lime",
)
Cancel stream and reset props
cancel=[(Input(ids.cancel_btn, "n_clicks"), 0)]
reset_props=[
(ids.table, {"rowData": [], "columnDefs": []}),
(ids.start_btn, {"children": "Download Data", "loading": False}),
(ids.cancel_btn, {"display": "none"}),
]
Endless Stream
@event_callback(
Input(StreamButtons.ids.start_btn, "n_clicks"),
cancel=[
(Input(RootContainer.ids.location, "pathname"), "/streaming/live-dashboard"),
(Input(StreamButtons.ids.end_btn, "n_clicks"), 0),
],
reset_props=[
(StreamButtons.ids.start_btn, {"disabled": False, "children": "Start stream"}),
(StreamButtons.ids.end_btn, {"display": "none"}),
],
on_error=lambda e: NotificationsContainer.send_notification(
title="Error", message=str(e), color="red"
)
)
async def update_graph(n_clicks):
yield NotificationsContainer.send_notification(
title="Starting stream!",
message="Notifications in Dash, Awesome!",
color="lime",
)
yield stream_props([
(StreamButtons.ids.start_btn, {"disabled": True, "children": "Running"}),
(StreamButtons.ids.end_btn, {"display": "flex"}),
])
while True:
await asyncio.sleep(0.5)
stock_ticks = ["google", "apple", "microsoft", "amazon"]
stocks = await asyncio.gather(*[get_stocks() for _ in stock_ticks])
update = []
for tick, value in zip(stock_ticks, stocks):
update.append((SSEGraph.ids.graph(tick), {"extendData": value}))
yield stream_props(update)