Is there any recommended solution for using streamlit session_states while running a part of your code with multiprocessing.Pool?
Of course, the problem is that by using multiprocessing, we run different processes (instead of the main streamlit run ā¦) and the new processes donāt inherit the session_states, resulting in KeyError. Iām curious to know whether thereās a workaround or itās a limitation that we have to deal with it (i.e. either using session_state or multiprocessing, and not both).
Sharing data between processes can be tricky. There are several ways of doing it documented in the multiprocessing module. The recommended solution would depend on the specifics of your use case.
Thanks for your reply @Goyo , but thatās not what I was looking for.
Indeed, sharing the states between the processes are tricky. But the question is if streamlit supports using of multiprocessing and session_state together?
You should be able to use both multiprocessing and session_state in a streamlit application. The workaround for ānew processes donāt inherit the session_statesā is sharing the data in some other way. I canāt be more specific without knowing more about your use case.
Create a session state variable to save your work. Run other processes as normal but save the result to session state when done.
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import time
import streamlit as st
if 'save' not in st.session_state:
st.session_state.save = []
def task(v):
"""session state does not work here"""
time.sleep(1)
return v * v
if __name__ == '__main__':
num_workers = 2
jobs = [1, 2, 3, 4, 5, 6, 7, 8, 9]
processed_jobs = []
start = st.button('start work')
if start:
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for j in jobs:
pj = executor.submit(task, j)
processed_jobs.append(pj)
for future in concurrent.futures.as_completed(processed_jobs):
try:
res = future.result()
st.write(f'res: {res}')
# Incrementally save the completed task so far.
st.session_state.save.append(res)
except concurrent.futures.process.BrokenProcessPool as ex:
raise Exception(ex)
if len(st.session_state.save):
st.write('#### Completed Jobs')
st.write(f'{st.session_state.save}')
Thank you both @Goyo and @ferdy for the examples. Theyāre indeed helpful. I think these can be marked as Solution in a general case.
However, after using them in my code, I realized my issue is actually running multiprocessing in multipage streamlit app. Multiprocessing needs to be run within if __name__ == '__main__': block, otherwise it recursively spawns new processes, and hence the error.
My knowledge on multipage apps is limited. What I would like to do is to streamlit run homepage.py, and from homepage go to other pages, which trigger the call to modules containing multiprocessing code.
I donāt see why what you say here would be issues at all.
If that is the case, just do it.
Going to a page triggers the execution of that pageās code. That code can import modules, call functions⦠the kind of stuff code usually does. Multiprocessing or not, it doesnāt matter.
Again, I donāt think we know enough about your issues to give meaningful advice.
This is a simplified version of my code structure (Please note, I made up the names and functions for sharing purposes.):
# homepage.py
import streamlit as st
def some_function():
# ...
def calculate_encodings():
st.session_state.dataset_path = st.text_input('The path your dataset:', '')
if st.session_state.dataset_path != '':
st.session_state.encodings = some_function()
if __name__ == '__main__':
calculate_encodings()
Everythings is fine here. After completion of calculate_encodings(), I click on another page from the menu (clustering.py). This page triggers a call to a function that uses multiprocessing:
# clustering.py
import streamlit as st
from src.utils import copy_data
if 'encodings' in st.session_state and st.session_state.encodings != None:
copy_data()
# rest of the code ...
And finally in my src/utils.py:
# src/utils.py
from concurrent.futures import ProcessPoolExecutor
def download_file(key):
# downloading the data from remote server
def copy_data():
keys_to_download = ['file1.txt', 'file2.txt', 'file3.txt']
with ProcessPoolExecutor() as executor:
for key in keys_to_download:
executor.submit(download_file, key)
Note that the second page (clustering.py) is not triggered from if __name__ == '__main__': block. Therefore I get the error.
As a workaround, I called copy_data() directly from homepage.py within calculate_encoding(). However, this is not a proper place for calling that function and I was wondering if copy_data() could be called directly from the clustering.py page or not.
I canāt say I totally understand the purpose of your code. I made minimal changes to make it compile and run, and I was unable to cause any errors. I got a warning, though:
WARNING streamlit.runtime.state.session_state_proxy: Session state does not function when running a script without `streamlit run
It seems to happen at some point after calling copy_data() and submiting all tasks to the executor but before the tasks are completed. I donāt know why it happens but it doesnāt seem to cause any problems --the code involved doesnāt depend on session_state nor anything streamlit-related actually.
Thanks for the quick reply. In my case I got the error: āA process in the process pool was terminated abruptly while the future was running or pending multiprocessingā after those warnings. Iām not sure if the error is streamlit issue or not. It could be related to the way I called boto3.
I will mark the answer as solution.