Problem - I have a circular reference error.
Background: dash is running on an AWS lambda with a 30 second timeout.
Code explanation: I have a download button which starts an athena query using a server side callback. The output of the serverside callback triggers the clientside callback which waits 30 seconds and then it triggers a different server side callback which checks if the athena query has completed if it has it will download the results, if the query has not completed it triggers the clientside 30 second wait and repeats the above until the query has completed.
This creates a circular reference error - any way to achieve what I am trying to achieve without a circular reference?
@callback(
Output(f"{self.id_prefix}-query-execution-id", 'data'),
Input(f"{self.id_prefix}-download-non-aggregate-button", 'n_clicks'),
State(f"{self.id_prefix}-filter-product", 'value'),
State(f"{self.id_prefix}-filter-technology", 'value'),
State(f"{self.id_prefix}-filter-node-name", 'value'),
prevent_initial_call=True,
)
def download_non_aggregate(_, product: str, technology: str, seller: str) -> str:
return self.get_execution_id(product, technology, seller)
ClientsideCallbackPopulator.populate('scatterChartCallbacks', {
f"poll{self.id_prefix_camel_case}": [
Output(f"{self.id_prefix}-query-execution-check", 'data'),
Input(f"{self.id_prefix}-query-execution-id", 'data'),
Input(f"{self.id_prefix}-query-execution-complete", 'data')
]
})
@callback(
Output(f"{self.id_prefix}-query-execution-complete", 'data'),
Output(f"{self.id_prefix}-download-non-aggregated-dataframe", 'data'),
Input(f"{self.id_prefix}-query-execution-check", 'data'),
State(f"{self.id_prefix}-query-execution-id", 'data'),
prevent_initial_call=True,
)
def download_non_aggregate(_, execution_id: str) -> any:
status = wr.athena.get_query_execution(execution_id)
if status[] == success:
return(no_update, wr.athena.get_query_results(execution_id))
else:
return(False, no_update)
I would probably try and decouple that somehow maybe using something in between athena and your dash app. Maybe SQS or lambda?
You could also try utilizing dash’s long callback methods
Thanks for the reply tphil10. Unfortunately I don’t think the link you provided will work in my case.
Also, I am using a lambda between Athena and the dash app - the lambda is where server side callbacks are executed and the query is started.
The only solution I have come up with so far is an additional button to check if the query is finished rather than automatically checking every 30 seconds using a client side callback.
Thinking through this…
You have a long running “process” of some kind that gets kicked off from a button in your dash app
That process can take many minutes/hours to complete
You want the app to register when its completed that process so the user can do something with it?
Those steps right?
Hi, you can use Aws step functions to create a better architecture. It allows you to create overarching logic and integration with many of the Aws services.
Reg,
J.
Thanks for the suggestions tphil10 and jcuypers. I thought I would post the solution we went with, incase it helps someone else. TLDR to get around the circular callback we used a server route.
We have a Server side callback that initiates the request from AWS Athena (AwsWrangler.athena.start_query_executuion) which returns the execution ID (output of the ss callback). This execution ID triggers a client side callback which tries to fetch a url from a server route …/whatever/executionid which runs the Awswrangler.athena.get_query_execution() method to check if the query has completed - if incomplete it returns a 404 status and the client side callback sleeps for 30 seconds before it tries again. When the athena query is complete the function within the server route returns a signed s3 url (only required for 4MB+, otherwise you can download directly from lambda server) to the client side callback and is then downloaded via the signed url on the client side.
Client side callback
const sleep = ms => new Promise(r => setTimeout(r, ms));
const downloadSellerPricesNonAggregate = async (executionId) => {
if (!executionId) return;
let retries = 3;
while (retries-- > 0) {
await sleep(30000);
const response = await fetch(`/download/${executionId}`);
if (!response.ok) continue;
const url = await response.text();
const a = document.createElement('a');
a.href = url;
document.body.appendChild(a);
a.click();
a.remove();
retries = 0
}
return false;
};
Server route (place in main app)
@app.server.route('/download/<execution_id>', methods=['GET'])
def download_endpoint(execution_id: str):
"""Takes whatever is passed as the execution_id and checks if the athena query is complete - If complete, returns a signed url for the dataset.
Args:
file_name (str): execution_id to check if complete
Returns:
signed url (str): url to s3 dataset
html status code (int): status of query
"""
execution = wr.athena.get_query_execution(execution_id)
if execution['Status']['State'] != 'SUCCEEDED':
return f"Error {execution_id}.json not found.", 404
file_location = execution['ResultConfiguration']['OutputLocation']
s3 = boto3.client('s3')
url = s3.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': 'aws-athena-query-results-<accoutnumber>-<region>',
'Key': f"{execution_id}.csv"
},
ExpiresIn=300 # 5 minutes
)
return url, 200
1 Like