Start multiple blocking requests at once in one page

I’m trying to create a page featuring several data driven outputs, each of them are provided by blocking requests taking several seconds to turn around.

I experience the same behavior locally and deployed.

I have a working solution (see below the intro), but I like to improve it, especially avoid the blocking wait at the end of the script.

Problem

Here is a simple request example (only one blocking request getInfo() for simplicity) -
You’ll need a google earth-engine (GEE) account to run this code though:

import ee
ee.Initialize()
import streamlit as st

naip2024 = ee.ImageCollection("USDA/NAIP/DOQQ").filterDate("2022-01-01", "2023-01-01")

st.write("## NAIP imagery from 2022")

st.write(naip2024.size().getInfo())

st.write("end of page")

This script shows end of page only after completing .getInfo(). I would like to have the page rendering completely and then update the outputs ASAP when ready.

I don’t consider caching, as I don’t know when to invalidate the cache and GEE has its own caching layer.

Work-in-Progress Solution

I choose the usual approach, putting blocking code into threads and updating the placeholder, which is described in several topics in this forum.

But the changes (delta messages) propagate to the front-end ONLY when I explicitly join (i.e. wait on completion) all the threads at the end of the script.

Please find my working code below, I’m sure it is already useful to others.
The code is ready for a one-file app, but obviously already prepared to become a module of reusable code.

It does not block till the very end at st_wait_for_gee_info(), and shows a temporary spinner with In progress while waiting on the response as a placeholder for each output.

I wonder …

Are there any examples, which would avoid blocking the script runner for the callbacks? I.e. creating a async awaitable in streamlit’s main event loop or thread outside the script runner’s loop?

Looking at some existing examples (esp GitHub - FloWide/streamlit_callbacks) - they seem to have become obsolete by an internal code refactor of the runtime architecture (fair enough… as it is internal). So I hope to find a way which doesn’t break that easily - which might be just the way I do…

############ st_gee.py file ##########

import threading
import typing

import ee
import streamlit as st
from streamlit.runtime.scriptrunner import add_script_run_ctx


CB_THREAD_ATTR_NAME = "gee_callback_threads"


def get_info_thread_target(placeholder, gee_object, formatter):
    with placeholder:
        with st.spinner():
            # todo: handle remote execptions
            gee_info = gee_object.getInfo()
        st.write(formatter(gee_info))


def default_formatter(x):
    return x


def st_gee_info(
    gee_object: ee.ComputedObject,
    formatter: typing.Optional[typing.Callable[[ee.ComputedObject], typing.Any]] = None,
) -> st.empty:
    """
    Render streamlit objects based on `getInfo` results.
    """
    if formatter is None:
        formatter = default_formatter

    placeholder = st.empty()
    # put this in a thread with a link to the empty cell
    getInfoThread = threading.Thread(
        target=get_info_thread_target,
        args=(placeholder, gee_object, formatter),
        name="gee_getInfo",
        daemon=True,
    )
    add_script_run_ctx(getInfoThread)
    getInfoThread.start()
    scriptThread = threading.current_thread()
    if not hasattr(scriptThread, CB_THREAD_ATTR_NAME):
        setattr(scriptThread, CB_THREAD_ATTR_NAME, [])
    getattr(scriptThread, CB_THREAD_ATTR_NAME).append(getInfoThread)
    return placeholder


def st_wait_for_gee_info() -> None:
    """
    Wait for the getInfo callbacks to complete.
    """
    all_threads = getattr(threading.current_thread(), CB_THREAD_ATTR_NAME, None)
    if all_threads:
        for t in all_threads:
            t.join()


######## gee_app.py ############

import ee
# from st_gee import st_gee_info, st_wait_for_gee_info
ee.Initialize()

import streamlit as st

naip2024 = ee.ImageCollection("USDA/NAIP/DOQQ").filterDate("2022-01-01", "2023-01-01")

st.write("## NAIP imagery from 2022")

# st.write(naip2024.size().getInfo())
st_gee_info(naip2024.size())

st.write("end of page")

# end of script
st_wait_for_gee_info()

Hi @AchimGaedkeLynker

Instead of running the request from the app which would require an initially long waiting time as you had mentioned, could you save the completed request by serializing it as a pickled object via pickle.dump() and then loading this into the app via pickle.load().

Thanks for the suggestion. I actually try to avoid extra state, only referring to what is on my google earth engine account.

The next step in my project is to have those long requests configurable with several options (select boxes), which will make the pre-runtime calculations more difficult.

I believe I need to learn how the DeltaGenerators actually work. Considering switching off the “magic” for a while so I can see how to trigger the layout changes explicitly.

1 Like