ServersideOutputTransform, Having the backend data as input and output to update in n_intervals


@app.callback(Output('new_data','data'),[Input('new_data','data'),Trigger('patch_data','n_intervals')])
def update_query_data(new_data):
    old_data_max_date = df.select('queued_date_service').max().item()
    connection_string = f"postgresql://tracking:{environ['SQL_PRD_PASSWORD']}@prd-avi-chatbot-tracking-replica.cybqqoxm5ee1.us-east-1.rds.amazonaws.com:5432/clean_data"
    query  = f""" 
                SELECT queued_date_service,steps,category,campaign_source,
                api_orders_hash_id,original_user_id
                FROM residential_child_2023_8
                where queued_date_service >= '{old_data_max_date}'
                and campaign_source is not null 
                and steps is not null
                and category not like 'human-handoff%'
            """
    rds_data = pl.read_database(query = query,connection = connection_string)
    print(rds_data.head())
    rds_data = rds_data.select(
            pl.col('queued_date_service').cast(pl.Datetime),
            pl.col('steps').cast(pl.Categorical),
            pl.col('category').cast(pl.Categorical),
            pl.col('campaign_source').cast(pl.Categorical),
            pl.col('original_user_id').cast(pl.Categorical),
            pl.col('api_orders_hash_id').cast(pl.Categorical))
            
    if new_data is None:
        info("Executando funcao que puxa dados pela primeira vez para formar cache do RDS")
        return Serverside(rds_data)

    info("Pegando dados novos e pegando dados feitos em cache pela primeira rodada e mergeando aos antigos")
    current_data = pl.concat([new_data,rds_data],how='vertical')
    print(current_data.head())
    return Serverside(current_data)

I have the following code, it is supposed to grab the data given in my RDS, and concat it to the data already loaded in the backend but i keep getting a semantic error, any ideas on what is wrong with my code. Would also like some feedback on a better way to write a self updating base dataframe dash

I believe you should use ‘State’ instead of ‘Input’. Have you tried that?

First of all I would like to say, OH MY GAWD is the dude that made dash-extensions. Please sign my linkedin u are a chad https://www.linkedin.com/in/francisco-nicolau-sargo-froes-0288a2195/. Second I made a different function, that only uses ram memory for now, because I just couldnt take it anymore but I guess now I for sure will not have any more troubles. Also I think the error was related to pickle not being able to pickle polars dataframes.

df = grab_data_s3_file_system()
info("Cacheando em lista dados puxados do RDS")
cache_data = []
cache_data.append(grab_rds_data(df))
info("Delivery do layout")
app.layout = make_layout()

@app.callback(Trigger('patch_data','n_intervals'))
def update_query_data():
    info("Pegando valor maximo dos dados em cache")
    old_cache_max_date = cache_data[-1].select('queued_date_service').max().item()
    connection_string = f"postgresql://tracking:{environ['SQL_PRD_PASSWORD']}@prd-avi-chatbot-tracking-replica.cybqqoxm5ee1.us-east-1.rds.amazonaws.com:5432/clean_data"
    query  = f""" 
                SELECT queued_date_service,steps,category,campaign_source,
                api_orders_hash_id,original_user_id
                FROM residential_child_2023_8
                where queued_date_service >= '{old_cache_max_date}'
                and campaign_source is not null 
                and steps is not null
                and category not like 'human-handoff%'
            """
    intermediate_data = pl.read_database(query = query,connection = connection_string)
    intermediate_data = intermediate_data.select(
            pl.col('queued_date_service').cast(pl.Datetime),
            pl.col('steps').cast(pl.Categorical),
            pl.col('category').cast(pl.Categorical),
            pl.col('campaign_source').cast(pl.Categorical),
            pl.col('original_user_id').cast(pl.Categorical),
            pl.col('api_orders_hash_id').cast(pl.Categorical))

    info("Concatenando novo dataframe RDS e somando ao seu anterior")
    current_data = pl.concat([cache_data[-1],intermediate_data],how='vertical').sort(by="queued_date_service")
    cache_data.append(current_data)
    info("Removendo elemento cacheado antigo")
    cache_data.pop(0)

Thanks! And great that you got it working. For reference, I would like to note that you are currently utilizing global variables, which may break your app depending on the environment (and most certainly will, if you deploy more than a single flask instance).

1 Like

Oh schnuzzle i didnt know that, fudge u made me do it u made me rewrite it

It works if it is in pandas, :frowning:

@app.callback(Output('server_data','data'),[State('server_data','data'),Trigger('patch_data','n_intervals')])
def create_update_query_data(server_data):
    if server_data is None:
        info("Puxando dados do S3")
        df = grab_data_s3_file_system()
        df_s3_max_date = df.select('queued_date_service').max().item()
        rds_df = load_df_rds(df_s3_max_date)
        server_data = pl.concat([df,rds_df],how='vertical').to_pandas()
        return Serverside(server_data)

    loaded_server_data = pl.from_pandas(server_data,schema_overrides={'queued_date_service':pl.Datetime,'steps':pl.Categorical,
                                                'category':pl.Categorical,'campaign_source':pl.Categorical,
                                                'original_user_id':pl.Categorical,'api_orders_hash_id':pl.Categorical})
    old_server_data_max_data = loaded_server_data.select('queued_date_service').max().item()
    new_server_data = load_df_rds(old_server_data_max_data)
    return Serverside(new_server_data.to_pandas())