What are the best practices for streaming high speed data and having interactions like a button not disconnect the stream.
Problem: When clicking a button, the entire script re-runs (as is expected with streamlit), but this disconnects the stream and the connection needs to be re-established. This leads to a delay with other interactions with the app/data.
Options:
- Keep connection alive when button click happens?
- Connect to the same websocket when code re-runs?
- Re-run all code and start connection to stream over again?
OPTION ONE: Keep connection alive when button click happens?
- I attempted to use the decorator fragment so that a button click would only re-run that part of the code. Unfortunately this doesn’t seem to trigger anything when the code is in the while loop receiving data from the stream.
#attempt to interact during a streaming while loop with @fragment
@st.fragment()
def button_interact():
st.button("Interact Button")
container_a = grid[0].empty().container()
with container_a:
interaction_function()
#key part of streaming while loop here
async def connect_listen(websocket):
async with (websockets.connect(ws_url) as websocket):
#When the async with block is exited, the websocket connection is automatically closed
#A series of STOMP Connection messages are sent to the websock
await websocket.send(Connect_Frame)
#this is where it listens in the loop
i = 0
while True:
i = i + 1
print(f'Attempt:{i}')
data = await websocket.recv()
st.write(data5)
The loop continues to run without running the fragment part of the code
Possible Next Steps For Option One:
- Perhaps I can use mulitiprocessing and run a seperate script. Utilizing a pipe / queue to send data back to streamlit like this: Passing data between separately running Python scripts - Stack Overflow
import streamlit as st
from multiprocessing import Process,Queue,Pipe
from child_mp import f
if __name__ == '__main__':
parent_conn,child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
st.write(parent_conn.recv()) # prints "Hello"
- or updating a session state variable that controls the while loop
#this is where it listens in the loop
i = 0
while st.session_state['continue_loop']:
i = i + 1
print(f'Attempt:{i}')
data = await websocket.recv()
st.write(data5)
OPTION TWO: Connect to the same websocket when code re-runs?
I was able to save a websocket as a session state variable, and then attempted to call it on the next code rerun
st.session_state['websocket']= await websockets.connect(ws_url)
which shows up as:
<websockets.legacy.client.WebSocketClientProtocol object at 0x0000017306665E80>
but I can’t seem to reconnect to it. After clicking a button it does a re-run, and I try this:
asyncio.run(mh.message_handling(st.session_state['websocket']))
It gives the following error:
RuntimeError: cannot call recv while another coroutine is already waiting for the next message
which makes me think it is still running in the background.
while True:
i = i + 1
print(f'Attempt:{i}')
data5 = await websocket.recv()
So if I try to save the loop
st.session_state['loop'] = asyncio.get_running_loop()
and then call it back on the next run
async def message_handling(websocket):
# loop = asyncio.get_running_loop()
i = 0
while True:
i = i + 1
print(f'Attempt:{i}')
data5 = await websocket.recv()
st.write(data5)
st.session_state['loop'].create_task(message_hand(st.session_state['websocket']))
I get the same error
RuntimeError: cannot call recv while another coroutine is already waiting for the next message
Possible Next Steps For Option Two:
- Perhaps there is another way to connect to it and start receiving data again from the stream. Maybe I need to await or cancel the task, before starting the next one.
- Maybe I can cancel the event loop and start another one with the existing websocket.
OPTION THREE: Re-run all code and restart the connection to stream?
problem: This restarts the stream and I have to re authenticate and resubscribe to the stream with each interaction that occurs.
Possible Next Steps For Option Three:
- make this reconnection faster?
Answers to debuggin post questions (though I am looking for a best practice approach more than debuggin):
- Are you running your app locally or is it deployed:
Locally - If your app is deployed:
no, just running it on my machine - Share the link to your app’s public GitHub repository (including a [requirements file]
not currently public - Share the full text of the error message (not a screenshot).
shared above for each potential option. - Share the Streamlit and Python versions.
stream lit:
python: