Update st.progress with joblib (multiprocessing tasks)

Summary

I’m using joblib for thousands of jobs of HTTP requests and data processing in parallel. I want to use a progress bar to update the amount completed.

Adding st.progress in the task will encounter BrokenProcessPool: A task has failed to un-serialize.

Steps to reproduce

app.py (run it with streamlit run app.py):

import time
import streamlit as st
from joblib import Parallel, delayed

def fetch(i: int) -> int:
    time.sleep(0.2)
    return i

counts = range(10)
with st.spinner("Running..."):
    i = 0
    length = len(counts)
    bar = st.progress(i, "Start")
    def _task(x):
        results = fetch(x)
        print(f"{x} => {results}")
        # update progress bar
        global i
        i += 1
        #! Adding this line encounter: `BrokenProcessPool: A task has failed to un-serialize.`
        bar.progress(i/length, f"progress: {i}/{length}")
        return results

    results = Parallel(n_jobs=-1)(
        delayed(_task)(count)
        for count in counts)

    print(results)

It will be fine if removing the line bar.progress(i/length, f"progress: {i}/{length}").

Is there any workaround using st.progress with joblib?

Furthermore, I found that using Python ProcessPoolExecutor with st.progress will be slow when running thousands of tasks.

Expected behavior:

Update the st.progress correctly with joblib.

Debug info

  • Streamlit version: 1.23.1
  • Python version: 3.11.3
  • Using Poetry
  • OS version: macOS 13.4
  • Browser version: Brave 1.52.117 (Chromium: 114.0.5735.90) arm64

Hi @xareelee

I’ve tweaked the code a bit and use multi-threading.

Here’s the code:

import time
import streamlit as st
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch(i: int) -> int:
    time.sleep(0.1)
    return i

def _task(x):
    result = fetch(x)
    return x, result

counts = range(1000)

def calculate():
    with st.spinner("Running..."):
        with ThreadPoolExecutor() as executor:
            bar = st.progress(0)
            placeholder = st.empty()
            futures = {executor.submit(_task, count): count for count in counts}
            results = []
            for idx, future in enumerate(as_completed(futures), start=1):
                count, result = future.result()
                results.append((count, result))
                progress = idx / len(counts)
                placeholder.text(f"{int(progress * 100)}%")
                # update progress bar
                bar.progress(progress)

if st.button('Calculate!'):
    calculate()

And the screencast of the app:

Screen Recording 2566-06-25 at 21_scaling-0.5_fps-20_speed-10.0_duration-0-18

Hope this helps!

Best regards,
Chanin