Storing Python generator objects in Dash

I’m creating a chatbot with Dash using the GPT3.5 API, but I encountered some problems with streaming. Dcc.store does not support Python generator objects; they are not json serializable.

The value in question is either the only value returned,
or is in the top level of the returned list,

                and has string representation
                `[['hi', <generator object EngineAPIResource.create.<locals>.<genexpr> at 0x7f4239cb6340>]]`

                In general, Dash properties can only be
                dash components, strings, dictionaries, numbers, None,
                or lists of those.

Is there any way to implement the streaming inside the session.

Hi @Ananthzeke, that’s correct. The dcc.Store()are usually clientside so the content has to be json- serializable.

You could use the ServerSideOutput of dash-extensions. Here, the objects to store are pickeled under the hood as far as I know

1 Like

Thanks, @AIMPED, but if it stores on the server, then does it increase the latency for retrieving the data?

yes, you are correct. I guess converting the generator into a list would not make sense, right?

Yep, I thought streaming the output would save some time, but implementing streaming in Dash would complicate things. @AIMPED Is there any other way to do it?

As far as storing information in dcc.Store() components there is no other solution I know of.

Other than that, I’m not sure what you are trying to do. I could imagine a dash app communicating with an fastAPI or even flask on which you have the openAI related code. But I’m not sure if this is the best way to set this up, looking at this.

No, it would actually decrease latency as the amount of data transferred is reduced (callbacks are executed serverside).

If you need streaming, you can use a Quart server to stream the data to a Websocket component combined with a client side callback that updates relevant UI components.

Haha, you are right. Lately I’m converting my callbacks to client-side, so I was purely thinking in that. :see_no_evil:

But in this way I can’t maintain multiple sessions right?

You can. But if you have multiple instances, you’ll need store the server side cache in a common location (e.g. a Redis instance)

1 Like

Hey @AIMPED and @Emil, I achieved streaming with dash Background callbacks especially with the progress parameter.

import time
from dash import Dash, DiskcacheManager, CeleryManager, Input, Output, html, callback
import os
import openai

openai.api_type = "azure"
openai.api_version = "version"
openai.api_key = "your_token"
openai.api_base = "model_url"

def process_history_gpt(history):
    messages = []
    for turn in history:
        user_side = {"role": "user", "content": turn[0]}
        messages.append(user_side)
        system_side = {"role": "system", "content": turn[1]}
        messages.append(system_side)
    return messages

def get_gpt3_stream_output(prompt, history):
    engine = "prompt-chat-gpt35"

    messages = process_history_gpt(history)
    messages.append({"role": "user", "content": prompt})

    response = openai.ChatCompletion.create(
        engine=engine,
        messages=messages,
        temperature=0.7,
        max_tokens=700,
        top_p=0.95,
        frequency_penalty=0,
        presence_penalty=0,
        stop=None,
        stream=True
    )

    return response


if 'REDIS_URL' in os.environ:
    # Use Redis & Celery if REDIS_URL set as an env variable
    from celery import Celery
    celery_app = Celery(__name__, broker=os.environ['REDIS_URL'], backend=os.environ['REDIS_URL'])
    background_callback_manager = CeleryManager(celery_app)

else:
    # Diskcache for non-production apps when developing locally
    import diskcache
    cache = diskcache.Cache("./cache")
    background_callback_manager = DiskcacheManager(cache)

app = Dash(__name__, background_callback_manager=background_callback_manager)


app.layout = html.Div(
    [
        html.Div(
            [
                html.P(id="paragraph_id", children=["Button not clicked"]),
                html.P(id='stream')
            ]
        ),
        html.Button(id="button_id", children="Run Job!"),
        html.Button(id="cancel_button_id", children="Cancel Running Job!"),
    ]
)

@callback(
    output=Output("paragraph_id", "children"),
    inputs=Input("button_id", "n_clicks"),
    background=True,
    cancel=Input("cancel_button_id", "n_clicks"),
    progress=[Output('stream','children')],
    prevent_initial_call=True
)
def update_progress(set_progress, n_clicks):
    words=''
    model_output=get_gpt3_stream_output('Write an essay about Actor Rajini Kanth',[])
    for i in model_output:
        if i.choices !=[] and 'content' in i.choices[0].delta.keys():
                words+=i.choices[0].delta.content
                set_progress(words)
                time.sleep(0.1)


    return f"Clicked {n_clicks} times"


if __name__ == "__main__":
    app.run(debug=True)

1 Like