Best (fastest) practice to display live 2D data

Dear all!

I have a detector which produces 2D images at video rates and higher. However, for live imaging rates in the range 10 Hz and more would be sufficient. Data comes as numpy arrays.
I face problems, when I want to display the data at the wanted rates.

The following code mocks up different ways to do this with streamlit. I am by no means a python - and especially no asyncio - expert, though.

  1. display, get the data, rerun
  2. get the data, display, rerun
  3. do displaying and data taking asynchronously

The third option does not need a st.experimental_rerun(). Part of the communication is done via streamlitā€™s st.sessions_state which is not always mandatory, but keeps the examples as similar as possible.

Version 2 does not work well at all. I see only a grayed out image.
Versions 1&3 only work up to delays of about 0.25 secs, only

I am doing this out of my home office using a ssh tunnel to a server at work over 100 MBit/s connection.

I have no idea how to speed up this, but I guess that there is a way, as for example watching youtube videos about streamlit is not a problem at all. :slight_smile:

Perhaps streamlit is not the way to go for this type of application?

Thanks a lot for any thoughts on this!

Best wishes
Markus

import streamlit as st
import numpy as np
import time
import asyncio

# --------------------------------------------------------------

def ui():
    if 'image' in st.session_state:
        st.image(st.session_state['image'])

def work(delay):
    time.sleep(delay)
    st.session_state['image'] = np.random.random((512, 512))

# --------------------------------------------------------------
# --------------------------------------------------------------

async def async_ui():
    with st.session_state['placeholder']:
        ui()
    r = await asyncio.sleep(0.01)

async def async_work(delay):
    work(delay)
    r = await asyncio.sleep(0.01)

# --------------------------------------------------------------
# --------------------------------------------------------------

def work_before_display(delay=1):
    work(delay)
    ui()

# --------------------------------------------------------------

def display_before_work(delay=1):
    ui()
    work(delay)

# --------------------------------------------------------------

async def do_all_asynchronously(delay=1):

    if 'tasks' in st.session_state:
        for t in st.session_state['tasks']:
            t.cancel()
    coroutines = [async_work(delay), async_ui()]
    tasks = []
    for s in coroutines:
        tasks.append(asyncio.ensure_future(s)) 
    if tasks:
        st.session_state['tasks'] = tasks
        res = await asyncio.gather(*tasks)


if __name__ == '__main__':

    if 'placeholder' not in st.session_state:
        st.session_state['placeholder'] = st.empty()

    delay = 0.25
    #display_before_work(delay=delay)

    #work_before_display(delay=delay)
    
    asyncio.run(do_all_asynchronously(delay=delay))

    st.experimental_rerun()

see also

an addition:

after some time variant 1 errors with
RecursionError: maximum recursion depth exceeded in comparison

  You can now view your Streamlit app in your browser.

  URL: http://127.0.0.1:8501

2021-12-10 09:50:02.755 InMemoryFileManager: Missing file 4f237411c238189ad04bff875b2cbae4ea5ef8e4c979c3413999b321.jpeg
2021-12-10 09:50:52.730 Traceback (most recent call last):
  File ".../lib/python3.8/site-packages/streamlit/script_runner.py", line 354, in _run_script
    exec(code, module.__dict__)
  File ".../test.py", line 55, in <module>
    display_before_work(delay=delay)
  File ".../test.py", line 39, in display_before_work
    ui()
  File ".../test.py", line 23, in ui
    st.image(st.session_state['image'])
  File ".../lib/python3.8/site-packages/streamlit/elements/image.py", line 120, in image
    marshall_images(
  File ".../python3.8/site-packages/streamlit/elements/image.py", line 370, in marshall_images
    proto_img.url = image_to_url(
  File ".../lib/python3.8/site-packages/streamlit/elements/image.py", line 272, in image_to_url
    data = _np_array_to_bytes(data, output_format=output_format)
  File ".../lib/python3.8/site-packages/streamlit/elements/image.py", line 181, in _np_array_to_bytes
    return _PIL_to_bytes(img, format)
  File ".../lib/python3.8/site-packages/streamlit/elements/image.py", line 167, in _PIL_to_bytes
    image.save(tmp, format=format, quality=quality)
  File ".../lib/python3.8/site-packages/PIL/Image.py", line 2240, in save
    save_handler(self, fp, filename)
  File ".../lib/python3.8/site-packages/PIL/JpegImagePlugin.py", line 745, in _save
    if isinstance(exif, Image.Exif):
  File ".../lib/python3.8/abc.py", line 98, in __instancecheck__
    return _abc_instancecheck(cls, instance)
RecursionError: maximum recursion depth exceeded in comparison

Well, I followed the asyncio approach a little bit more and found a solution where I get about 10 fps. Much better than above.

import streamlit as st
import numpy as np
import time
import asyncio
from datetime import datetime

st.set_page_config(  # Alternate names: setup_page, page, layout
    layout="wide",  # Can be "centered" or "wide". In the future also "dashboard", etc.
    initial_sidebar_state="auto",  # Can be "auto", "expanded", "collapsed"
    page_title='TEST',  # String or None. Strings get appended with "ā€¢ Streamlit". 
    page_icon=None,  # String, anything supported by st.image, or None.
)

# --------------------------------------------------------------

def ui(placeholder):
    if 'image' in st.session_state:
        with placeholder:
            st.image(st.session_state['image'], 
                caption=f'image count: {st.session_state["count"]}, {str(datetime.now())}')

def work(delay):
    time.sleep(delay)
    image = np.random.random((512, 512))

    # do some "image processing"
    n = st.session_state['count'] % (image.shape[0]-10)
    m = st.session_state['count'] % (image.shape[1]-10)
    image[n:n+10] = 0
    image[:, m:m+10] = 1

    st.session_state['image'] = image
    st.session_state['count'] += 1

# --------------------------------------------------------------
# --------------------------------------------------------------

async def async_ui(placeholder):

    while True:
        ui(placeholder)
        # give other tasks a chance to step in
        r = await asyncio.sleep(0.0001)

async def async_work(delay):
    
    while True:
        work(delay)
        # give other tasks a chance to step in
        r = await asyncio.sleep(0.0001)

async def do_all_asynchronously(coroutines, delay=1, placeholder=None):

    # empty list of tasks (from previous streamlit session) 
    # and make sure that every is cancelled
    while st.session_state['tasks']:
        t = st.session_state['tasks'].pop()
        t.cancel()
        del t # not sure whether this is needed

    tasks = []
    # now reschedule coroutines
    for cor in coroutines:
        tasks.append(asyncio.ensure_future(cor))
    
    # store in st.session_state to access them after a rerun 
    st.session_state['tasks'] = tasks
    
    # let them run
    _ = await asyncio.gather(*tasks, return_exceptions=False)

    # from here on the script is blocked!
    # However, interaction with a widget raises a RerunException
    # This appears to finish the async_ui coroutine.
    # Since `return_exceptions=False`, asyncio.gather will
    # return, but leaves other tasks running.
    # Therefore they have to be cancelled explicitly upon rerun
    # to clean up things


def reset():

    st.session_state['count'] = 0


if __name__ == '__main__':

    delay = 0.1

    if 'tasks' not in st.session_state:
        st.session_state['tasks'] = []
    
    st.session_state['count'] = 0
 
    st.title('Hello random image!')
    placeholder = st.empty()
    st.button('Reset', on_click=reset)

    print('start asynchronous work')
    coroutines = [async_work(delay), async_ui(placeholder)]
    asyncio.run(do_all_asynchronously(coroutines, delay=delay, placeholder=placeholder))

    print('This text will/should never be printed!')

What I would like to have is a better structure, is this then called pythonic?
What I tried was to put all methods into a class, and upon first run of the script store an instance of that class in st.session_state for reuse and tracking of the asynchronous tasks. But this slowed down things considerably, but why?

Any ideas from the streamlit experts?

Thanks a lot!
Markus

1 Like

Waw @w-markus you made an awesome example here! To be honest I did not expect Streamlit and asyncio to play well this way.

I donā€™t know why the slowdown if you store in state, maybe Streamlit tries to do some attribute tracking on elements in a class, did not check :confused: .

When using asyncio, I really usually prefer to use a producer/consumer method, with your producer pushing images in a queue (could be downloading data from your sensor) and consumer fetching images from the queue and displaying them. That way you can control the produce and consumer separately, can scale the number of producers if necessary. Async IO in Python: A Complete Walkthrough ā€“ Real Python

Also st.image on a numpy array uses Pillow to convert to an image. Apparently OpenCV is 1.3x faster on this task so I manually create the images from your numpy array.

Iā€™ve created aā€¦a bit of a complex example but here:

import asyncio
from datetime import datetime

import cv2
import numpy as np
import streamlit as st


QUEUE_SIZE = 1000
SIZE_IMAGE = 512


def get_or_create_eventloop():
    try:
        return asyncio.get_event_loop()
    except RuntimeError as ex:
        if "There is no current event loop in thread" in str(ex):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return asyncio.get_event_loop()


async def produce_images(queue, delay):
    while True:
        _ = await asyncio.sleep(delay)
        image = np.random.random((SIZE_IMAGE, SIZE_IMAGE)).astype(np.float32)

        # Add bars depending on state count
        n = st.session_state.produced_images % SIZE_IMAGE
        m = st.session_state.produced_images % SIZE_IMAGE
        image[n : n + 10] = 0
        image[:, m : m + 10] = 1

        _ = await queue.put(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
        st.session_state.produced_images += 1


async def consume_images(image_placeholder, queue_size_placeholder, queue, delay):
    while True:
        _ = await asyncio.sleep(delay)
        image = await queue.get()
        image_placeholder.image(
            image,
            caption=f"Consumed images: {st.session_state.consumed_images}, {str(datetime.now())}",
        )
        queue_size_placeholder.metric(
            f"In queue (queue size is {QUEUE_SIZE})", st.session_state.queue.qsize()
        )
        st.session_state.consumed_images += 1
        queue.task_done()


async def run_app(
    image_placeholder, queue_size_placeholder, queue, produce_delay, consume_delay
):
    _ = await asyncio.gather(
        produce_images(queue, produce_delay),
        consume_images(image_placeholder, queue_size_placeholder, queue, consume_delay),
    )


##### ACTUAL APP

if __name__ == "__main__":
    st.set_page_config(
        layout="wide",
        initial_sidebar_state="auto",
        page_title="Asyncio test",
        page_icon=None,
    )

    if "event_loop" not in st.session_state:
        st.session_state.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(st.session_state.loop)

    # if "queue" not in st.session_state:
    #    st.session_state.queue = asyncio.Queue(QUEUE_SIZE)
    # if "produced_images" not in st.session_state:
    #    st.session_state.produced_images = 0
    # if "consumed_images" not in st.session_state:
    #    st.session_state.consumed_images = 0
    st.session_state.queue = asyncio.Queue(QUEUE_SIZE)
    st.session_state.produced_images = 0
    st.session_state.consumed_images = 0

    st.title("Hello random image!")
    produce_delay = 1 / st.sidebar.slider(
        "Produce images Frequency (img / second)", 1, 100, 10
    )
    consume_delay = 1 / st.sidebar.slider(
        "Display images Frequency (img / second)", 1, 100, 10
    )
    c1, c2 = st.columns(2)
    image_placeholder = c1.empty()
    queue_size_placeholder = c2.empty()

    asyncio.run(
        run_app(
            image_placeholder,
            queue_size_placeholder,
            st.session_state.queue,
            produce_delay,
            consume_delay,
        )
    )

ezgif-6-6851d0c544a6

This is not perfect yet but hopefully can help:

  • if no elements in queue or too much the app will bug
  • Iā€™m showing 50 fps on the gif, which is almost way too much for st.image to handle, at about 700 images it looks like the InMemoryFileManager is in pain, and something else goes awry. 10 fps should be fine but Iā€™ll let you experiment and tell me, I did not dive deep into it.

Hope it helps,
Fanilo :balloon:

3 Likes

Thanks a lot @andfanilo !

And yes, one thing I was wondering is that the displaying coroutine keeps displaying though there isnā€™t a new image data yet. Thatā€™s not very sustainable. Working on demand will be much better.

Regarding the InMemoryFileManager, I have seen complains of it already at rates of 0.5 fps when using display_before_work() from the first script I posted here.

In fact, I tried this approach of using a queue to feed the displaying routine. However, I could not get it working when using the multiprocessing module (so far). But this, very likely, could be even better, as I am using a server with 96 cores and about 512 GB of RAM. :slight_smile:

The sensor can deliver data at rates of several GB/s which are then to be processed on the fly. So, dividing workload would also relieve streamlit.

What do you think? Is multiprocessing an option?

Anyway, Iā€™ll try your example and let you know.

Best wishes
Markus

Yes, your solution performs much better. But this only visible, when I run the scripts locally. When doing this with the network configuration I described above it is not very well detectable. I think the network is the bottleneck. If the image would be transferred uncompressed to the my browser this could easily result in tens of MBit/s.

I also was thinking of converting the data to some kind of video data, which could be send to st.video()? But so far I couldnā€™t find enough information on how to accomplish this.

If network is the bottleneck then multiprocessing would help a bit but not too much :confused:

You could, I think st.video can take a BytesIO array where you could write frames of video data, but I donā€™t think youā€™ll get a better performance, youā€™ll be swapping multiple overlapping small videos like for images and get the same behavior as for images.

Ah, well if you know your way around Javascript, you could probably build a quick component/components.html that would connect to your sensor via webrtc or some other HTML video streaming capability, and receive/display the uncompressed video data. That would not pass data to Streamlit so no python latency.

And then if you go into the component route, maybe send batches of video frames back to Streamlit Python every 10 seconds or so for example if you need to do some post processing on some period of time.

Donā€™t have an example right now though :confused:

so, tried it within ā€œrealā€ office, i.e. direct network connection without tunneling. Performance is indeed increased, however, the images (still) do not run smoothly.

@andfanilo - Nice example there! I added queue.qsize() > 0 check in the consumer. It seems to run pretty smoothly on my laptop upto 100 fps, although it may be harder to perceive how smooth it is using random images. It judders every now and then.

2 Likes

Donā€™t have much to add here, but I can say this is a fascinating use-case and example! If anyone is interested in pursuing this as a (potentially paid) blog post / demo example, please send me a DM and we can discuss :slight_smile:

Best,
Randy

2 Likes

Dear all, Happy New Year!

Iā€™d like to come back to this issue. In fact, I am back to streamlit, after giving dash/plotly a try. :slight_smile:

While plotly/dash being more flexible, I was not able to get a better performance, which might simply be a result from being a once in a while python scripter.

For my purposes, I find streamlit much easier to master. And I think the speed up we could achieve in this current use case is quite nice.

This brings me to my comment/question:

The speed up here is accomplished by using asyncio. However, the call of asyncio.run() at the end of the script prevents that control is going back to streamlit (though interaction is working).

But there is a (not so nice) side effect: when a given script changes the UI, say upon pressing some button some fields are to be removed, these fields get not removed completely, but remain visible, though grayed out (but still active!). I reckon this may relate to Ghost Elements in UI .

Would it be possible to provide a something like st.add_asyncio_run(my_func, args, kwargs), which is run after streamlit has finished rendering the web page?
With respect to the example above, something like

st.add_asyncio_run(run_app, 
    image_placeholder,
    queue_size_placeholder,
    st.session_state.queue,
    produce_delay,
    consume_delay)

?
Perhaps with some additional functionality to handle the event loop and one or more queues?

Best wishes
Markus

Dear all!

Another remark/question.
When running the example of @andfanilo and when playing with the sliders, I sometimes receive the error:

RuntimeError: Task <Task pending name='Task-324' coro=<consume_images() running at /home/cri/Software/pyelmilix/src/pyelmilix/test_fast.py:41> cb=[gather.<locals>._done_callback() at /home/cri/Software/conda/miniconda/miniconda3/envs/pyelmilix/lib/python3.8/asyncio/tasks.py:769]> got Future <Future pending> attached to a different loop

However, I cannot see, where the eventloop gets changed, though the only place is streamlit code that surrounds the execution of the above script.

Can this be avoided, while keeping the nice communication feature of using Queue()?

What I did in my scripts is wrapping my application in a class instance , which I store in st.sessionstate (st.sessionstate.my_app=MyApp()) once during first execution of my script. All functionality I need, is attached to this instance. Data that is to be shared between different (asynchronous) routines is shared then via common attributes, i.e. st.sessionstate.my_app.data_producer.data can be accessed by st.sessionstate.my_app.data_consumer(). This saves me of keeping track of too many sessionstate entries.
Other possibilities would be the use of st.sessionstate for storing common data or the use of class attributes/variables.

Is there something I might have overlook, that could lead to problems, when using this approach?
I have seen, that caching becomes difficult, as streamlit does not know out-of-the-box how to hash my routines/classes.

Is performance affected by one way or the other?

Again, thanks a lot in advance for all your comments and thoughts!

Markus

This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.