Streamlit session_state with multiprocesssing

Is there any recommended solution for using streamlit session_states while running a part of your code with multiprocessing.Pool?

Of course, the problem is that by using multiprocessing, we run different processes (instead of the main streamlit run ā€¦) and the new processes donā€™t inherit the session_states, resulting in KeyError. Iā€™m curious to know whether thereā€™s a workaround or itā€™s a limitation that we have to deal with it (i.e. either using session_state or multiprocessing, and not both).

Thanks

2 Likes

Sharing data between processes can be tricky. There are several ways of doing it documented in the multiprocessing module. The recommended solution would depend on the specifics of your use case.

Thanks for your reply @Goyo , but thatā€™s not what I was looking for.

Indeed, sharing the states between the processes are tricky. But the question is if streamlit supports using of multiprocessing and session_state together?

You should be able to use both multiprocessing and session_state in a streamlit application. The workaround for ā€œnew processes donā€™t inherit the session_statesā€ is sharing the data in some other way. I canā€™t be more specific without knowing more about your use case.

I would appreciate if you can share a piece of code as a minimal working example.

Of course. In this example all processes share the exponent.

from multiprocessing import Pool

import streamlit as st


def main():
    exponent = st.number_input(label="Exponent", value=0)
    start = st.number_input(label="Start", value=0)
    stop = st.number_input(label="Stop", value=3)
    data = range(start, stop)
    f = RaiseTo(exponent=exponent).compute
    with Pool() as p:
        result = p.map(f, data)
        st.write(result)


class RaiseTo:
    def __init__(self, exponent):
        self.exponent = exponent

    def compute(self, x):
        return x**self.exponent


if __name__ == "__main__":
    main()
1 Like

Create a session state variable to save your work. Run other processes as normal but save the result to session state when done.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import time

import streamlit as st


if 'save' not in st.session_state:
    st.session_state.save = []


def task(v):
    """session state does not work here"""
    time.sleep(1)
    return v * v


if __name__ == '__main__':
    num_workers = 2
    jobs = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    processed_jobs = []

    start = st.button('start work')

    if start:
        with ProcessPoolExecutor(max_workers=num_workers) as executor:
            for j in jobs:
                pj = executor.submit(task, j)
                processed_jobs.append(pj)

            for future in concurrent.futures.as_completed(processed_jobs):
                try:
                    res = future.result()
                    st.write(f'res: {res}')

                    # Incrementally save the completed task so far.
                    st.session_state.save.append(res)

                except concurrent.futures.process.BrokenProcessPool as ex:
                    raise Exception(ex)

    if len(st.session_state.save):
        st.write('#### Completed Jobs')
        st.write(f'{st.session_state.save}')

Output

1 Like

Thank you both @Goyo and @ferdy for the examples. Theyā€™re indeed helpful. I think these can be marked as Solution in a general case.

However, after using them in my code, I realized my issue is actually running multiprocessing in multipage streamlit app. Multiprocessing needs to be run within if __name__ == '__main__': block, otherwise it recursively spawns new processes, and hence the error.

My knowledge on multipage apps is limited. What I would like to do is to streamlit run homepage.py, and from homepage go to other pages, which trigger the call to modules containing multiprocessing code.

I donā€™t see why what you say here would be issues at all.

If that is the case, just do it.

Going to a page triggers the execution of that pageā€™s code. That code can import modules, call functionsā€¦ the kind of stuff code usually does. Multiprocessing or not, it doesnā€™t matter.

Again, I donā€™t think we know enough about your issues to give meaningful advice.

In the example code that I show, you can save it in workers.py for example and put that under the pages folder.

This is a simplified version of my code structure (Please note, I made up the names and functions for sharing purposes.):

# homepage.py

import streamlit as st

def some_function():
    # ...


def calculate_encodings():
    st.session_state.dataset_path = st.text_input('The path your dataset:', '')
    if st.session_state.dataset_path != '':
        st.session_state.encodings = some_function()

if __name__ == '__main__':
    calculate_encodings()

Everythings is fine here. After completion of calculate_encodings(), I click on another page from the menu (clustering.py). This page triggers a call to a function that uses multiprocessing:

# clustering.py

import streamlit as st
from src.utils import copy_data

if 'encodings' in st.session_state and st.session_state.encodings != None:
   copy_data()
   # rest of the code ...

And finally in my src/utils.py:

# src/utils.py

from concurrent.futures import ProcessPoolExecutor


def download_file(key):
    # downloading the data from remote server


def copy_data():
    keys_to_download = ['file1.txt', 'file2.txt', 'file3.txt']
    with ProcessPoolExecutor() as executor:
        for key in keys_to_download:
            executor.submit(download_file, key)

Note that the second page (clustering.py) is not triggered from if __name__ == '__main__': block. Therefore I get the error.

As a workaround, I called copy_data() directly from homepage.py within calculate_encoding(). However, this is not a proper place for calling that function and I was wondering if copy_data() could be called directly from the clustering.py page or not.

I canā€™t say I totally understand the purpose of your code. I made minimal changes to make it compile and run, and I was unable to cause any errors. I got a warning, though:

WARNING streamlit.runtime.state.session_state_proxy: Session state does not function when running a script without `streamlit run

It seems to happen at some point after calling copy_data() and submiting all tasks to the executor but before the tasks are completed. I donā€™t know why it happens but it doesnā€™t seem to cause any problems --the code involved doesnā€™t depend on session_state nor anything streamlit-related actually.

Thanks for the quick reply. In my case I got the error: ā€œA process in the process pool was terminated abruptly while the future was running or pending multiprocessingā€ after those warnings. Iā€™m not sure if the error is streamlit issue or not. It could be related to the way I called boto3.
I will mark the answer as solution.