Hi!
I want to delegate some expensive calculations to celery using background callback mechanism. Since calculations are independent, I’d like to have multiple (3) tasks running in parallel and updating result graph on completion. To do so, I’ve created 3 background callbacks for calculations:
@dash.callback(Output({'role': 'simulation_results', 'task': 'band_structure'}, 'data'),
[Input({'role':'context', 'store': 'nanostructure'}, 'data')],
[State({'role':'context', 'store': 'material_params'}, 'data')],
background=True,
prevent_initial_call=True)
def simulate_band_structure(nanostructure, materials_params):
logging.info("BS called")
# Do stuff here
logging.info("BS done")
return result
@dash.callback(Output({'role': 'simulation_results', 'task': 'wave_functions'}, 'data'),
[Input({'role':'context', 'store': 'nanostructure'}, 'data')],
[State({'role':'context', 'store': 'material_params'}, 'data')],
background=True,
prevent_initial_call=True)
def simulate_wave_functions(nanostructure, materials_params):
logging.info("Psi called")
# Do stuff here
logging.info("Psi done")
return result
@dash.callback(Output({'role': 'simulation_results', 'task': 'dos'}, 'data'),
[Input({'role':'context', 'store': 'nanostructure'}, 'data')],
[State({'role':'context', 'store': 'material_params'}, 'data')],
background=True,
prevent_initial_call=True)
def simulate_dos(nanostructure, materials_params):
logging.info("DoS called")
# Do stuff here
logging.info("DoS done")
return result
and single callback for plotting:
@dash.callback(dash.dependencies.Output('nanostructure-fig', 'figure'),
[dash.dependencies.Input({'role':'context', 'store': 'nanostructure'}, 'data'),
dash.dependencies.Input({'role': 'simulation_results', 'task': 'band_structure'}, 'data'),
dash.dependencies.Input({'role': 'simulation_results', 'task': 'wave_functions'}, 'data'),
dash.dependencies.Input({'role': 'simulation_results', 'task': 'dos'}, 'data')],
[dash.dependencies.State({'role':'context', 'store': 'material_params'}, 'data')],
prevent_initial_call=True)
def _plot_results(nanostructure, band_structure, wave_functions, dos, material_params):
caller_id = get_caller_id(dash.callback_context)
logging.error('Plot CID %s', caller_id)
# Plot results here
return figure
Background callbacks are executed as expected:
[2023-05-11 15:06:53,493: INFO/MainProcess] Task long_callback_fca855c257161cf63905ddd23a1482db2532f04d[296b6f6c-9fbe-4be1-ab7d-67be3ca2ab33] received
[2023-05-11 15:06:53,497: INFO/MainProcess] Task long_callback_ee56ffa65cd0b22a78c090b3b571d90034dcf59a[269c88e1-fcd3-4bfa-a1c9-3a77aa450762] received
[2023-05-11 15:06:53,499: INFO/ForkPoolWorker-7] BS called
[2023-05-11 15:06:53,500: INFO/ForkPoolWorker-8] DoS called
[2023-05-11 15:06:53,501: INFO/MainProcess] Task long_callback_df56322e21f12f5a131c82384c84f63937321921[d9a735d0-16ee-44b3-b7da-b7319ffcc3f6] received
[2023-05-11 15:06:53,508: INFO/ForkPoolWorker-1] Psi called
[2023-05-11 15:06:53,509: INFO/ForkPoolWorker-1] Psi called
[2023-05-11 15:06:53,509: INFO/ForkPoolWorker-1] Psi done
[2023-05-11 15:06:53,520: INFO/ForkPoolWorker-1] Task long_callback_df56322e21f12f5a131c82384c84f63937321921[d9a735d0-16ee-44b3-b7da-b7319ffcc3f6] succeeded in 0.012703596999926958s: None
[2023-05-11 15:07:05,119: INFO/ForkPoolWorker-7] BS done
[2023-05-11 15:07:05,137: INFO/ForkPoolWorker-7] Task long_callback_fca855c257161cf63905ddd23a1482db2532f04d[296b6f6c-9fbe-4be1-ab7d-67be3ca2ab33] succeeded in 11.637707060002867s: None
[2023-05-11 15:09:06,406: INFO/ForkPoolWorker-8] DoS done
[2023-05-11 15:09:06,414: INFO/ForkPoolWorker-8] Task long_callback_ee56ffa65cd0b22a78c090b3b571d90034dcf59a[269c88e1-fcd3-4bfa-a1c9-3a77aa450762] succeeded in 132.91452989299796s: None
However plotting callback is executed only once after all background callback have finished:
[ ERROR] 2023-05-11 15:09:07 outputprocessor.py@ 27 Plot CID {"role":"context","store":"nanostructure"}
And caller ID seems not to be any of simulation_result
(i.e. result of celery task execution) but “nanostructure”
Btw.:
Before introduction of background callback, celery interaction was done by:
- defining celery tasks with @
celery_app.task
- creating dcc.Intervals
- periodically poliing celery_task for status
- copying results with AsyncResult() when task status==‘SUCCESS’
and that solution worked as expected.
Any ideas how to force graph refresh when each celery task finishes would be appreciated.
Thanks!