Using streamlit in an asynchronous LangGraph workflow graph

Is it possible to use things like st.chat_input effectively in asynchronous nodes in a LangGraph workflow graph?

For context: I created a chatbot on the command line with LangGraph that makes some asynchronous requests in some of the nodes. This is because it fills out some tool calls from the user’s input and uses those to make a few independent requests to a (somewhat slow) REST API. This works well on the CLI.

However, I wanted to wrap this CLI chatbot in a nice little web interface, so that’s where Streamlit comes in.

I have this simple streamlit_app.py file:

import streamlit as st
import asyncio
from main import main

st.title("Chatbot")

if "messages" not in st.session_state:
    st.session_state.messages = []

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

if st.session_state.get('run_chat', False):
    asyncio.run(main())

if st.button("Start Chat"):
    st.session_state.run_chat = True
    st.rerun()

and in my asynchronous main function, I’m doing some initialization work and building my workflow graph, then invoking it via ainvoke(initial_state). However, the problem I’m running into is that some of the nodes are getting user input. Something simple like:

    async def get_user_input(state: GraphState) -> GraphState:
        user_input = input('User: ')
        return { 'chat_history': [HumanMessage(role='user', content=user_input)] }

So I attempted to replace it with something like:

    async def get_user_input(state: GraphState) -> GraphState:
        global unique_id
        user_input = st.chat_input('Prompt', key=unique_id)
        unique_id = unique_id + 1
        if user_input:
            st.session_state.messages.append({'role': 'user', 'content': user_input})
            with st.chat_message('user'):
                st.markdown(user_input)
            return {'chat_history': [HumanMessage(role='user', content=user_input)]}
        else:
            return state

However, when I run the Streamlit application via streamlit run streamlit_app.py, I can see this user_input is immediately getting set to None after making the call to st.chat_input. Due to the structure of the rest of the graph, this causes it to loop continuously. So I’m wondering if I’m missing something here. Admittedly I’m quite new to Streamlit, so this might be a bit of a dumb question with an obvious answer (if so, my apologies).

Also, I know the above functions do not need to be async, but LangGraph is reporting a large number of warnings that I don’t currently know how to disable when they are not marked async since I’m invoking the graph execution asynchronously, so I just left them marked as such. However, I have also tried running the application with the async modifier left off this node, and it makes no difference in the behavior I’m seeing (which I should think makes sense). I still see user_input just immediately getting set to None.

I’m guessing this has to do with Streamlit’s execution flow - where, from what I’ve read, it re-runs the script from top to bottom in response to basically any change on the page. So it seems to me that maybe something like st.chat_input will return None until the user makes the “press enter or send button on the chat input text field” change on the page. If so, then maybe this is to be expected.

I was ideally trying to integrate it into my application without having to entirely restructure everything to make it work with Streamlit, but maybe that’s not possible with its execution model.

Thanks in advance for any ideas as to what I’m doing wrong here or how to fix it. :slight_smile:

Nevermind. I fixed it by simply refactoring my workflow graph to not retrieve the user’s input or display the chatbot’s response. I removed those nodes from the graph and then retrieve the input at the top level with Streamlit, pass that as part of the initial state to the workflow graph invocation, and then simply passed the response at the end of the chat history in the state back up from the workflow graph to the top level Streamlit application logic and displayed it there.

However, on a side note - I have not figured out how I can maintain a webclient that opens an asynchronous client session (via aiohttp’s ClientSession) and manages it with a context manager across the entire application flow. I end up having to close and reopen those sessions every time I invoke the workflow graph, and thus also rebuild the graph each time I need to use it (since the nodes are kept in a closure that maintains a reference to those asynchronous webclients after opening the session). This works, but seems… suboptimal.

If anyone has any ideas, please do share.

Actually, figured out the solution to the above inquiry too. Made some service manager class decorated with @st.cache_resource that holds a reference to an asyncio event loop and then uses that event loop to initialize those services via their __aenter__ dunders in its __init__ method, and then conversely closes them via __aexit__ in its __del__ method.

1 Like