Hey folks. It appears that asyncio behaves differently when you’re running your code in a jupyter notebook vs. in streamlit. Why is that? The backend is tornado in both cases, no?
As far as I know, some work was made on Jupyter (and more precisely IPython) to make asyncio work out of the box without you needing to create an asyncio loop (thus the call to get_event_loop()).
On the other side, Streamlit apps work a little bit differently than notebooks. They run sychronously, and multiple times (each time you interact with your app, the whole script reruns). In Streamlit you need to create a new asyncio event loop, because there’s none you can use in your app’s context.
The recommendation I came across was to wrap the binding to loop in a try/except block. See consumer_producer_runner() in this event producer/consumer/queue pattern (which I’ve adapted from some of my own code):
producer_consumer.py
# Asyncio-based producer/consumer/queue pattern implementation
import asyncio
async def event_consumer(queue, my_consumer):
while True:
event = await queue.get()
try:
# call the "event handler" to produce a result
result = my_consumer.process(event)
except Exception as ex:
result = ['>> Exception in consumer event handler <<', str(ex)]
queue.task_done()
my_consumer.report(result)
async def event_producer(queue, my_producer):
while True:
event = await my_producer.next_event() # your producer should expose a `next_event()` method
if event:
# use a fake assignment of the next statement to prevent st
# from auto-writing the return result as 'None'
_ = await queue.put(event)
# will terminate only when app is closed (i.e., there's no explicit producer/consumer thread termination)
async def consumer_producer_runner(my_producer, my_consumer):
# In Streamlit context there might not be an event loop
# present, so need to create one. (loop, consumers, producers, queue
# must all be set up in the same awaitable thread!)
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
queue = asyncio.Queue()
consumer = asyncio.create_task(event_consumer(queue, my_consumer))
producer = asyncio.create_task(event_producer(queue, my_producer))
# use fake assignments in these next statements to prevent st
# from auto-writing the return results as 'None'
_ = await asyncio.gather(producer)
_ = await queue.join()
consumer.cancel
You can run test.py to see it working, and it’ll also work in both Jupyter and Streamlit.
test.py
# Dummy producer/consumer:
import asyncio
import streamlit as st
from producer_consumer import consumer_producer_runner
STREAMLIT = True if st._get_report_ctx() else False
messageboard = None
reporter = print
if STREAMLIT:
messageboard = st.empty()
reporter = messageboard.write
# Dummy producer
class MyProducer:
def __init__(self):
self.count = 0
async def next_event(self):
self.count += 1
return await asyncio.sleep(1, result=self.count)
# Dummy consumer
class MyConsumer:
def __init__(self):
pass
def process(self, event):
return {'result': event}
def report(self, result):
reporter(result)
# ------------------------------------------------------------------------------
# Asynchronously run producer/consumer like so:
if __name__ == '__main__':
asyncio.run(consumer_producer_runner(MyProducer(), MyConsumer()))
I use this pattern to shuttle events from a Streamlit custom component to my Streamlit event handler. It’s a bit overkill but I thought I was losing events from my component, so introduced a queue.
I found the trick to have the same code run in the two places. Use the nest_asyncio module in your jupyter code and it will run the same in the two venues. In particular, place nest_asyncio.apply() in one of your first cells and you’re good to go.