Display LLM response stream from OpenAI Assistant API

Will st.write_stream be updated soon to support streaming with the assistant API?

The current write stream can stream streams from assistant API.

The write_stream just needs a generator such as data_streamer.

def data_streamer():
    for word in _LOREM_IPSUM.split(" "):
        yield word + " "
        time.sleep(0.02)
st.write_stream(data_streamer)

Assistant API streams data at different locations.

    def on_tool_call_created(self, tool_call):
        print(f"\nassistant > {tool_call.type}\n", flush=True)
    
    def on_tool_call_delta(self, delta, snapshot):
        if delta.type == 'code_interpreter':
            if delta.code_interpreter.input:
                print(delta.code_interpreter.input, end="", flush=True)

...

We can save those responses in a list and build our own streamer function.

if 'msg' not in st.session_state:
    st.session_state.msg = []

def data_streamer():
    import time
    for word in st.session_state.msg:
        yield word
        time.sleep(0.1)

Save the API streams in session state.


            @override
            def on_text_delta(self, delta, snapshot):
                print(delta.value, end="", flush=True)
                st.session_state.msg.append(delta.value)
                
            def on_tool_call_created(self, tool_call):
                tct = f'{tool_call.type}\n\n'
                print(f"\nassistant > {tct}", flush=True)
                st.session_state.msg.append(tct)

            # etc

Once the assistant API is done streaming, we can then stream it in streamlit.

with st.chat_message("assistant"):
    st.write_stream(data_streamer)

Sample output

This output is streamed with a delay of 0.1 sec.

time.sleep(0.1)

There can be a better way than this. This is just a quick demo.

This is another method.

This is the openai LLM responder.

        stream = client.beta.threads.create_and_run(
            assistant_id=assistant.id,        
            thread={
                "messages": msg_history
            },
            stream=True
        )

Create a session variable to store that stream object.

if 'stream' not in ss:
    ss.stream = None

Build our streamer.

def data_streamer():
    for response in ss.stream:
        if response.event == 'thread.message.delta':
            value = response.data.delta.content[0].text.value
            yield value
            time.sleep(0.1)

Complete code

Prepare your openai api key for testing.

streamlit_app.py

"""Use assistant api in streamlit app with streaming."""


import streamlit as st
from streamlit import session_state as ss
from openai import OpenAI
import time


# variables
if 'stream' not in ss:
    ss.stream = None

if "messages" not in ss:
    ss.messages = []


# functions
def data_streamer():
    """
    That stream object in ss.stream needs to be examined in detail to come
    up with this solution. It is still in beta stage and may change in future releases.
    """
    for response in ss.stream:
        if response.event == 'thread.message.delta':
            value = response.data.delta.content[0].text.value
            yield value
            time.sleep(0.1)


def init_assistant():
    """Define client and assistant"""
    client = OpenAI(api_key=ss.oaik)
    
    assistant = client.beta.assistants.create(
        name="Math Tutor",
        instructions="You are a personal math tutor. Write and run code to answer math questions.",
        tools=[{"type": "code_interpreter"}],
        model="gpt-4-turbo-preview",
    )

    return client, assistant


def main():
    st.text_input('enter openai api key', type='password', key='oaik')

    if not ss.oaik:
        st.error('Please enter your open api key!')
        st.stop()

    # initialize openai assistant
    client, assistant = init_assistant()

    # show messages
    for message in ss.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # prompt user
    if prompt := st.chat_input("What is up?"):

        ss.messages.append({"role": "user", "content": prompt})

        with st.chat_message("user"):
            st.markdown(prompt)

        msg_history = [
            {"role": m["role"], "content": m["content"]} for m in ss.messages
        ]

        ss.stream = client.beta.threads.create_and_run(
            assistant_id=assistant.id,        
            thread={
                "messages": msg_history
            },
            stream=True
        )
        
        with st.chat_message("assistant"):
            response = st.write_stream(data_streamer)
            ss.messages.append({"role": "assistant", "content": response})


if __name__ == '__main__':
    main()

Sample output

2 Likes

Great details, thanks!