@app.callback(Output('new_data','data'),[Input('new_data','data'),Trigger('patch_data','n_intervals')])
def update_query_data(new_data):
old_data_max_date = df.select('queued_date_service').max().item()
connection_string = f"postgresql://tracking:{environ['SQL_PRD_PASSWORD']}@prd-avi-chatbot-tracking-replica.cybqqoxm5ee1.us-east-1.rds.amazonaws.com:5432/clean_data"
query = f"""
SELECT queued_date_service,steps,category,campaign_source,
api_orders_hash_id,original_user_id
FROM residential_child_2023_8
where queued_date_service >= '{old_data_max_date}'
and campaign_source is not null
and steps is not null
and category not like 'human-handoff%'
"""
rds_data = pl.read_database(query = query,connection = connection_string)
print(rds_data.head())
rds_data = rds_data.select(
pl.col('queued_date_service').cast(pl.Datetime),
pl.col('steps').cast(pl.Categorical),
pl.col('category').cast(pl.Categorical),
pl.col('campaign_source').cast(pl.Categorical),
pl.col('original_user_id').cast(pl.Categorical),
pl.col('api_orders_hash_id').cast(pl.Categorical))
if new_data is None:
info("Executando funcao que puxa dados pela primeira vez para formar cache do RDS")
return Serverside(rds_data)
info("Pegando dados novos e pegando dados feitos em cache pela primeira rodada e mergeando aos antigos")
current_data = pl.concat([new_data,rds_data],how='vertical')
print(current_data.head())
return Serverside(current_data)
I have the following code, it is supposed to grab the data given in my RDS, and concat it to the data already loaded in the backend but i keep getting a semantic error, any ideas on what is wrong with my code. Would also like some feedback on a better way to write a self updating base dataframe dash
First of all I would like to say, OH MY GAWD is the dude that made dash-extensions. Please sign my linkedin u are a chad https://www.linkedin.com/in/francisco-nicolau-sargo-froes-0288a2195/. Second I made a different function, that only uses ram memory for now, because I just couldnt take it anymore but I guess now I for sure will not have any more troubles. Also I think the error was related to pickle not being able to pickle polars dataframes.
df = grab_data_s3_file_system()
info("Cacheando em lista dados puxados do RDS")
cache_data = []
cache_data.append(grab_rds_data(df))
info("Delivery do layout")
app.layout = make_layout()
@app.callback(Trigger('patch_data','n_intervals'))
def update_query_data():
info("Pegando valor maximo dos dados em cache")
old_cache_max_date = cache_data[-1].select('queued_date_service').max().item()
connection_string = f"postgresql://tracking:{environ['SQL_PRD_PASSWORD']}@prd-avi-chatbot-tracking-replica.cybqqoxm5ee1.us-east-1.rds.amazonaws.com:5432/clean_data"
query = f"""
SELECT queued_date_service,steps,category,campaign_source,
api_orders_hash_id,original_user_id
FROM residential_child_2023_8
where queued_date_service >= '{old_cache_max_date}'
and campaign_source is not null
and steps is not null
and category not like 'human-handoff%'
"""
intermediate_data = pl.read_database(query = query,connection = connection_string)
intermediate_data = intermediate_data.select(
pl.col('queued_date_service').cast(pl.Datetime),
pl.col('steps').cast(pl.Categorical),
pl.col('category').cast(pl.Categorical),
pl.col('campaign_source').cast(pl.Categorical),
pl.col('original_user_id').cast(pl.Categorical),
pl.col('api_orders_hash_id').cast(pl.Categorical))
info("Concatenando novo dataframe RDS e somando ao seu anterior")
current_data = pl.concat([cache_data[-1],intermediate_data],how='vertical').sort(by="queued_date_service")
cache_data.append(current_data)
info("Removendo elemento cacheado antigo")
cache_data.pop(0)
Thanks! And great that you got it working. For reference, I would like to note that you are currently utilizing global variables, which may break your app depending on the environment (and most certainly will, if you deploy more than a single flask instance).