Routing Callbacks to Celery Worker on External Server

Hi,
Thank you for Dash, it’s great! I am now at the stage where some queries are far too memory-intensive for my server to handle. So, I want to offload these callbacks to an external machine.

I would like to use the background=True, Celery Worker, and Redis concept covered here. I have a celery worker/Redis server running on a separate server. Just trying to figure out how to connect the background callback to that server. Any ideas here?

Any advice would be much appreciated!

Hello @mdylan2,

My guess is that the magic would happen here:

celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])

If you route that to your other broker and backend, that should help you out.

Nevermind, I think this will work!

1 Like

I think the tricky part is making sure the server is exposed to the network traffic.

You could even use a load balancer if necessary.

Yeah I did that. All working now, thanks for your help @jinnyzor

Steps for future people:

  1. Copy paste code into a file called app.py on both server. Make sure you have celery, dash, dash[celery], etc. installed through pip
  2. Set up Redis on external server. Expose server to traffic from 6379 (ufw allow 6379). Ensure you have security/password in place for Redis. You’d likely be fine with Step 4 here but follow other stuff if you want
  3. Redis URL should be :<password>@<IP ADDRESS>:6379. Make sure IP addresses are properly configured depending on where you’re running the app. On the external server which is running Redis, the IP address should be 127.0.0.1
  4. Run celery on both servers celery -A app.celery_app worker --loglevel=INFO
  5. Start up the app on your local computer python app.py
1 Like

You did the hard stuff. :wink:

1 Like

Never mind, just realised that it works purely locally but I can’t get the jobs to run on the remote. I might need to look into the routing of the Celery app

ERROR/MainProcess] Received unregistered task of type 'long_callback_f7722f98bb338a8e66ad8bac63eaa193c9df1a4b'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[["580e5cef6ee1529cb0873964b0770104bcea8e9a", "580e5cef6ee1529cb0873964b0770104bcea8e9a-progress", [8], {"args_grouping": [{"id": "button_id", "property": "n_clicks", "value": 8, "str_id": "button_id", "triggered": true}], "using_args_grouping": false, "outputs_grouping": {"id": "paragraph_id", "property": "children"}, "using_outputs_grouping": false, "inputs_list": [{"id": "button_id", "property": "n_clicks", "value": 8}], "states_list": [], "outputs_list": {"id": "paragraph_id", "property": "children"}, "input_values": {"button_id.n_clicks": 8}, "state_values": {}, "triggered_inputs": [{"prop_id": "button_id.n_clicks", "value": 8}], "ignore_register_page": true}], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (747b)

Thw full contents of the message headers:
{'lang': 'py', 'task': 'long_callback_f7722f98bb338a8e66ad8bac63eaa193c9df1a4b', 'id': 'e5a2dfb0-d504-4cc5-9f26-4253ac2385cc', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e5a2dfb0-d504-4cc5-9f26-4253ac2385cc', 'parent_id': None, 'argsrepr': "('580e5cef6ee1529cb0873964b0770104bcea8e9a', '580e5cef6ee1529cb0873964b0770104bcea8e9a-progress', [8], {'args_grouping': [{...}], 'using_args_grouping': False, 'outputs_grouping': {'id': 'paragraph_id', 'property': 'children'}, 'using_outputs_grouping': False, 'inputs_list': [{...}], 'states_list': [], 'outputs_list': {'id': 'paragraph_id', 'property': 'children'}, 'input_values': {'button_id.n_clicks': 8}, 'state_values': {}, 'triggered_inputs': [{...}], 'ignore_register_page': True})", 'kwargsrepr': '{}', 'origin': 'XXXXX@computer, 'ignore_result': False}

The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "/home/pai/celery_worker/venv/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
    strategy = strategies[type_]
KeyError: 'long_callback_f7722f98bb338a8e66ad8bac63eaa193c9df1a4b'

Maybe this can help you out:

I actually think it’s because when I start up Celery, each background callback is identified with a specific hash function. For some reason, the hash function for the task on my local server has a different hash to the one on my remote. Need to figure out why this is the case and how to rectify this

Are you starting two services?

I have the Dash web application running on my local machine and a celery worker running on my local and foreign machine

Ugh, never mind, I had different callback functions… really dumb bug here :man_facepalming:

Solution above stands!

1 Like

It’s the simple things. Lol.

1 Like

@mdylan2 I’m hitting this error now. I have everything running locally via docker-compose.

Can you explain what you mean with “I had different callback functions”. That would be helpful.

Thank you!

@ztawil You need to make sure that the callback function in your Celery worker and the app hosting the code is the exact same (i.e. exact same text). The code needs to be the exact same (i.e. even new lines, etc).

The text needs to be the same because dash turns that callback function into a hash ID for the Celery workers to know which callback to run (i.e. jobs get logged on the Redis database using the hash function and a Celery worker picks up the hash and figures out which callback to run)

This will not work:

Celery worker instance:

app.callback(output, input)
def callback(input):
   return input

Other server running background callbacks:

app.callback(output, input)
def callback(input):
  print(input)
   return input

Thanks for the reply @mdylan2 . I think I had a different issue that was solved here.

The tl;dr. I defined my celery_app in a celery_app.py file that only defined the app.
In a separate app.py I create the app using Dash(__name__....). I was starting my celery worker by running the celery_app.py file:
celery -A celery_app.celery_app worker --loglevel=INFO
but instead I should’ve imported into my app.py,
then run celery -A app.celery_app worker --loglevel=INFO.

2 Likes