Difference in behavior with asyncio in st vs jupyter

Hey folks. It appears that asyncio behaves differently when you’re running your code in a jupyter notebook vs. in streamlit. Why is that? The backend is tornado in both cases, no?

For streamlit, I need to run something like:

async def coro(*args):
    ...
    return await some_dope_bzns

def somethingDope(dopecoro):
    ...  

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    return asyncio.run(dopecoro)

whereas, in a jupyter notebook the last few lines are changed to:

    ...
    loop = asyncio.get_event_loop()
    return loop.create_task(dopecoro)

What’s up with that?

Hello @billthelol,

As far as I know, some work was made on Jupyter (and more precisely IPython) to make asyncio work out of the box without you needing to create an asyncio loop (thus the call to get_event_loop()).

On the other side, Streamlit apps work a little bit differently than notebooks. They run sychronously, and multiple times (each time you interact with your app, the whole script reruns). In Streamlit you need to create a new asyncio event loop, because there’s none you can use in your app’s context.

2 Likes

Hi @okld , @billthelol

The recommendation I came across was to wrap the binding to loop in a try/except block. See consumer_producer_runner() in this event producer/consumer/queue pattern (which I’ve adapted from some of my own code):

producer_consumer.py

# Asyncio-based producer/consumer/queue pattern implementation

import asyncio

async def event_consumer(queue, my_consumer):
    while True:
        event = await queue.get()
        try:
            # call the "event handler" to produce a result
            result = my_consumer.process(event)
        except Exception as ex:
            result = ['>> Exception in consumer event handler <<', str(ex)]
        queue.task_done()
        my_consumer.report(result)

async def event_producer(queue, my_producer):
    while True:
        event = await my_producer.next_event() # your producer should expose a `next_event()` method
        if event:
            # use a fake assignment of the next statement to prevent st
            # from auto-writing the return result as 'None'
            _ = await queue.put(event)

# will terminate only when app is closed (i.e., there's no explicit producer/consumer thread termination)
async def consumer_producer_runner(my_producer, my_consumer):
    # In Streamlit context there might not be an event loop
    # present, so need to create one. (loop, consumers, producers, queue
    # must all be set up in the same awaitable thread!)
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    queue = asyncio.Queue()

    consumer = asyncio.create_task(event_consumer(queue, my_consumer))
    producer = asyncio.create_task(event_producer(queue, my_producer))
    # use fake assignments in these next statements to prevent st
    # from auto-writing the return results as 'None'
    _ = await asyncio.gather(producer)
    _ = await queue.join()
    consumer.cancel

You can run test.py to see it working, and it’ll also work in both Jupyter and Streamlit.

test.py

# Dummy producer/consumer:

import asyncio
import streamlit as st
from  producer_consumer import consumer_producer_runner

STREAMLIT = True if st._get_report_ctx() else False
messageboard = None
reporter = print
if STREAMLIT:
    messageboard = st.empty()
    reporter = messageboard.write

# Dummy producer
class MyProducer:
    def __init__(self):
        self.count = 0
    async def next_event(self):
        self.count += 1
        return await asyncio.sleep(1, result=self.count)

# Dummy consumer
class MyConsumer:
    def __init__(self):
        pass
    def process(self, event):
        return {'result': event}
    def report(self, result):
        reporter(result)

# ------------------------------------------------------------------------------
# Asynchronously run producer/consumer like so: 

if __name__ == '__main__':
    asyncio.run(consumer_producer_runner(MyProducer(), MyConsumer()))

From the console you should see:

$ python test.py
{'result': 1}
{'result': 2}
{'result': 3}
{'result': 4}
:
etc.
:

You can also run it in Streamlit:

$ streamlit run test.py

I use this pattern to shuttle events from a Streamlit custom component to my Streamlit event handler. It’s a bit overkill but I thought I was losing events from my component, so introduced a queue. :slight_smile:

2 Likes

@asehmi and other future peoples of Earth,

I found the trick to have the same code run in the two places. Use the nest_asyncio module in your jupyter code and it will run the same in the two venues. In particular, place nest_asyncio.apply() in one of your first cells and you’re good to go.

2 Likes

@billthelol Thanks for the pointer.