Hey @jinnyzor, indeed this works, thanks! I’ve never heard of this option before. I’m quite new to Dash so sorry if some of the questions seem a bit dumb.
If you still would like to help me, then I’ve got one final problem before I’ve all the building blocks to make the final app. As eventually, I want to create a feedback where data is being checked before I write a new value in the function update_progress(). To do this, I need to write data to a queue in the function tester() and read it in the function update_progress(). At first I thought that it would be very similar to the reverse problem that you already helped me solve (store in update_progress() and read in tester()). However, I tried and I don’t get it to work. It only appends one value and keeps updating this, for instance: [0] → [0, 1] → [0, 2] → [0, 3] instead of: [0] → [0, 1] → [0, 1, 2] → [0, 1, 2, 3]. (Even using the newly learned ctx.triggered returns the same result)
I’ve also tried to again include a progress according to the code sniplet below, but that continuously returns the error that an argument is missing although all arguments are defined and fed to the function.
I’ve included the code below that returns the values as described earlier. Is this impossible due to the n_interval over this function? This is what I thought after trying and therefore decided to put it into an independent callback:
import dash
from dash import dcc, html, dash_table, DiskcacheManager, CeleryManager, Output, Input, State, ctx
import dash_bootstrap_components as dbc
import plotly
import plotly.graph_objs as go
from collections import deque
import time
import os
import random
from json import JSONEncoder
import json
from collections import deque
class DequeEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, deque):
return list(obj)
return JSONEncoder.default(self, obj)
#import propar
xxx = deque(maxlen=21)
xxx.append(0)
fb = deque(maxlen=5)
fb.append(0)
### Create Necessities for Background Proces
if 'REDIS_URL' in os.environ:
# Use Redis & Celery if REDIS_URL set as an env variable
from celery import Celery
celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
background_callback_manager = CeleryManager(celery_app)
else:
# Diskcache for non-production apps when developing locally
import diskcache
cache = diskcache.Cache("./cache")
background_callback_manager = DiskcacheManager(cache)
### Assign and Call App
app = dash.Dash(__name__, background_callback_manager=background_callback_manager)
app.layout = html.Div(
[
dbc.Button(id="button_id", children="Start Pulsing"),
dbc.Button(id="cancel_button_id", children="STOP", style={'backgroundColor': 'red', 'color':'white'}),
dcc.Store(id='myData', data=json.dumps(xxx, cls=DequeEncoder)),
dcc.Store(id='runData', data=''),
dcc.Store(id='stopData', data=''),
html.Div(id="progress"),
dcc.Graph(id='test_graph', animate=True),
dcc.Interval(
id='graph-update',
interval=1000,
n_intervals=0
),
dcc.Store(id='test_value'),
html.Div(id='show_test_value'),
dcc.Store(id='fbData', data=json.dumps(fb, cls=DequeEncoder)),
]
)
### Tester
@app.callback(
output=[Output("button_id", "id"), Output("progress", "children")],
inputs=[Input('button_id', 'n_clicks'), State('myData', 'data')],
background=True,
running=[
(Output("button_id", "disabled"), True, False),
(Output("cancel_button_id", "disabled"), False, True),
],
progress=[Output("progress", "children"), Output('runData', 'data')],
cancel=[Input("cancel_button_id", "n_clicks")],
prevent_initial_call=True
)
def update_progress(set_progress, n_clicks, xxx):
if n_clicks > 0:
n = 0
xxx = json.loads(xxx)
while n <= 100:
progress = n / 100 * 100
xxx.append(20)
set_progress([f"{progress} %", json.dumps(xxx[-1], cls=DequeEncoder)])
n += 10
time.sleep(1)
return dash.no_update, 'completed'
@app.callback(
Output('stopData', 'data'),
Input("cancel_button_id", 'n_clicks'),
prevent_initial_call=True
)
def stopData(n1):
if n1 > 0:
return 0
return dash.no_update
## Show Last Value
@app.callback(
Output('show_test_value', 'children'),
Input('test_value', 'data'),
State('fbData', 'data')
)
def test_shower(value, fb):
fb = json.loads(fb)
fb.append(value['Value'])
json.dumps(fb, cls=DequeEncoder)
print(fb)
return value['Value']
@app.callback(
Output('myData', 'data'),
Input('stopData','data'),
Input('runData','data'),
State('myData', 'data'),
prevent_initial_call=True
)
def updateData(d1, d2, xxx):
if d1 or d2:
xxx = json.loads(xxx)
xxx.append(ctx.triggered[0]['value'])
return json.dumps(xxx, cls=DequeEncoder)
### Figure
X = deque(maxlen=86400) # Max Length for X Component Array (Currently a day)
X.append(0) # Append how many values
Y = deque(maxlen=86400) # Max Length for X Component Array (Currently a day)
Y.append(0) # Append how many values
@app.callback(
[Output('test_graph', 'figure'),
Output('test_value', 'data')],
Input('graph-update', 'n_intervals'),
State('myData', 'data')
)
def tester(n, xxx):
q = json.loads(xxx)
X.append(X[-1] + 1)
Y.append(float(q[-1]))
test_value = {"Value" : X[-1]}
data = plotly.graph_objs.Scatter(
x=list(X),
y=list(Y),
name='Scatter',
mode='lines+markers'
)
return {'data': [data], 'layout': go.Layout(xaxis=dict(range=[min(X), max(X)]),
yaxis=dict(range=[float(min(Y)), float(max(Y))]), )}, test_value
if __name__ == '__main__':
app.run_server(debug=True)