Will st.write_stream
be updated soon to support streaming with the assistant API?
The current write stream can stream streams from assistant API.
The write_stream just needs a generator such as data_streamer.
def data_streamer():
for word in _LOREM_IPSUM.split(" "):
yield word + " "
time.sleep(0.02)
st.write_stream(data_streamer)
Assistant API streams data at different locations.
def on_tool_call_created(self, tool_call):
print(f"\nassistant > {tool_call.type}\n", flush=True)
def on_tool_call_delta(self, delta, snapshot):
if delta.type == 'code_interpreter':
if delta.code_interpreter.input:
print(delta.code_interpreter.input, end="", flush=True)
...
We can save those responses in a list and build our own streamer function.
if 'msg' not in st.session_state:
st.session_state.msg = []
def data_streamer():
import time
for word in st.session_state.msg:
yield word
time.sleep(0.1)
Save the API streams in session state.
@override
def on_text_delta(self, delta, snapshot):
print(delta.value, end="", flush=True)
st.session_state.msg.append(delta.value)
def on_tool_call_created(self, tool_call):
tct = f'{tool_call.type}\n\n'
print(f"\nassistant > {tct}", flush=True)
st.session_state.msg.append(tct)
# etc
Once the assistant API is done streaming, we can then stream it in streamlit.
with st.chat_message("assistant"):
st.write_stream(data_streamer)
Sample output
This output is streamed with a delay of 0.1 sec.
time.sleep(0.1)
There can be a better way than this. This is just a quick demo.
This is another method.
This is the openai LLM responder.
stream = client.beta.threads.create_and_run(
assistant_id=assistant.id,
thread={
"messages": msg_history
},
stream=True
)
Create a session variable to store that stream object.
if 'stream' not in ss:
ss.stream = None
Build our streamer.
def data_streamer():
for response in ss.stream:
if response.event == 'thread.message.delta':
value = response.data.delta.content[0].text.value
yield value
time.sleep(0.1)
Complete code
Prepare your openai api key for testing.
streamlit_app.py
"""Use assistant api in streamlit app with streaming."""
import streamlit as st
from streamlit import session_state as ss
from openai import OpenAI
import time
# variables
if 'stream' not in ss:
ss.stream = None
if "messages" not in ss:
ss.messages = []
# functions
def data_streamer():
"""
That stream object in ss.stream needs to be examined in detail to come
up with this solution. It is still in beta stage and may change in future releases.
"""
for response in ss.stream:
if response.event == 'thread.message.delta':
value = response.data.delta.content[0].text.value
yield value
time.sleep(0.1)
def init_assistant():
"""Define client and assistant"""
client = OpenAI(api_key=ss.oaik)
assistant = client.beta.assistants.create(
name="Math Tutor",
instructions="You are a personal math tutor. Write and run code to answer math questions.",
tools=[{"type": "code_interpreter"}],
model="gpt-4-turbo-preview",
)
return client, assistant
def main():
st.text_input('enter openai api key', type='password', key='oaik')
if not ss.oaik:
st.error('Please enter your open api key!')
st.stop()
# initialize openai assistant
client, assistant = init_assistant()
# show messages
for message in ss.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# prompt user
if prompt := st.chat_input("What is up?"):
ss.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
msg_history = [
{"role": m["role"], "content": m["content"]} for m in ss.messages
]
ss.stream = client.beta.threads.create_and_run(
assistant_id=assistant.id,
thread={
"messages": msg_history
},
stream=True
)
with st.chat_message("assistant"):
response = st.write_stream(data_streamer)
ss.messages.append({"role": "assistant", "content": response})
if __name__ == '__main__':
main()
Sample output
Great details, thanks!
This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.