Displaying a tqdm bar with multiprocessing

I have a program that processes multiple files using Python’s multiprocessing library, the thing is that I want to show a progress bar for the user on the frontend side.

The general idea of my multiprocessing section is as follows:

def worker(file_path, queue_obj):
    result = func(target=file_path)
    queue_obj.put(result)
    return result

def listener(queue_obj):
    model = load_model() # tf model
    while True:
    message = queue_obj.get()
    if message == 'done':
        break
    else:
        score = make_inference()
        print(score)

def multi_process_directory(file_paths):
    manager = mp.Manager()
    queue = manager.Queue()    
    pool = mp.Pool(mp.cpu_count())
    
    for i in range(mp.cpu_count() // 3)
        pool.apply_async(listener, (queue, ))

    jobs = []

    for i in range((len(file_paths)):
        job = pool.apply_async(worker, (file_paths[i], queue))
        jobs.append(job)

    for job in jobs: 
        job.get()

    queue.put('done')
    pool.close()
    pool.join()

Now I want to add a progress bar where I loop around the file_paths list in the multi_process_directory function.

I tried incorporating this stqdm snippet in my program but I couldn’t figure out how to use the worker’s output.
Using stqdm is not a-must.

Hi @ImSo3K

Thanks for your question! Streamlit has a built-in progress bar method that you can use called st.progress()

Essentially, you can embed the method inside a for loop in order to track the progress (Example code from the above mentioned link:

import streamlit as st
import time

progress_text = "Operation in progress. Please wait."
my_bar = st.progress(0, text=progress_text)

for percent_complete in range(100):
    time.sleep(0.1)
    my_bar.progress(percent_complete + 1, text=progress_text)

Hope this helps!

Best regards,
Chanin

It doesn’t seem to work with multiprocessing, I added the progress bar after
job = pool.apply_async(worker, (file_paths[i], queue))
and it just goes to 100% without waiting for a job to finish

bump

Hi @ImSo3K

It seems @asehmi had implemented a solution for a multiprocessing task with a progress bar
here, there’s also an accompanying code and demo.

Hopefully this is what you’re looking for.

Best regards,
Chanin

1 Like

FYI - I have updated my little test app based on a question asked in the gist comments.

Hi,
thank you for the code snippet.

Since I’m passing arguments besides my iterable to the worker function, I used functools.partial to achieve my purpose.

The worker fire off section looks like this now:

func = partial(worker, session_target_path, queue)
for _ in stqdm(pool.imap(func, file_paths), total=len(file_paths)):
    pass

Now I’ve got two questions on the subject:

  1. Since I don’t need the index i, is there a way to make it look less clunky? with the for _ in... and pass
  2. Do you know if its possible to start two stqdm bars in parallel? I tried implementing another one in my listener’s while loop to track inference progress using this guide but it didn’t even show.
    Mine is even more simpler (all I do is initialize progress outside the loop and update in after make_inference)
  1. You need some sort of iteration in proportion to total to drive stqdm progress reporting, so probably no way around that.
  2. I’ve used the context form with stqdm() as progress, and set progress.description in an iteration within that context. I have also done this with nested stqdm contexts (each assigned to a separate variable), and can update the progress of each context independently.

progress.description doesn’t seem to possible since that method does not exists for that instance.

So the code snipped I posted initially is inside the context of a st.sidebar.expander.

My updated code looks like this now:

def worker(file_path, session_target_path, queue_obj):
    result = func(target=file_path)
    queue_obj.put(result, timeout=60)

def listener(queue_obj, n_files):
    model = load_model() # tf model
    with stqdm(desc='text', total=n_files) as progress:
        while True:
            try:
                message = queue_obj.get(timeout=20)

            except Empty:
                break

            score = make_inference()
            progress.update()

            print(score)

def multi_process_directory(file_paths):
    manager = mp.Manager()
    queue = manager.Queue()    
    pool = mp.Pool(mp.cpu_count())
    
    # Now using a single listener
    pool.apply_async(listener, (queue, len(file_paths)))

    func = partial(worker, session_target_path, queue)
    for _ in stqdm(pool.imap(func, file_paths), total=len(file_paths)):
        pass

    pool.close()
    pool.join()

I debugged the listener section and the progress bar does update, but it is never shown. I tried disabling the one the worker uses (with the imap function) and it didn’t help.
And one last thing, after the worker bar is completed it vanishes, is it intended?

bump

I’m not the stqdm guy :slight_smile: It would help me and others to help you if you provided a minimum full working version of your code that can be run from the command line.

Cheers!

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.