Are you running your app locally or is it deployed? locally
Share the Streamlit and Python versions. 1.28.1, 3.11.0
Hello, firstly I am quite new to python / streamlit so apologies if I’m missing something obvious. I have a background thread running a tcp server and when i recieve a message I want to print it to a text area within streamlit page. However, I cannot get the text_area to auto update, I have to manually rerun the page.
A minimium working example of my issue is below:
import streamlit as st
import time
from streamlit.runtime.scriptrunner import add_script_run_ctx
import threading
class foo(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._data = list()
self._data_lock = threading.Lock()
@property
def data(self):
with self._data_lock:
return self._data
def run(self):
i = 1
while i<10:
time.sleep(1)
self.data.append(f'line {i}')
st.rerun()
i = i+1
def getvalue(self):
return '\n'.join(self.data)
@st.cache_resource
def get_foo():
bar = foo()
bar.daemon = True
add_script_run_ctx(bar)
bar.start()
return bar
bar = get_foo()
st.text_area('foo',value=bar.getvalue())
st.write(len(bar.data))
I expect the text_area to populate with a new line every second, however it will only populate if I manually rerun the page.
import streamlit as st
from notifier import notify # code from @exrhizo put in notifier file without the last line
import time
from streamlit.runtime.scriptrunner import add_script_run_ctx
import threading
class foo(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self._data = list()
self._data_lock = threading.Lock()
@property
def data(self):
with self._data_lock:
return self._data
def run(self):
i = 1
while i<10:
time.sleep(1)
self.data.append(f'line {i}')
notify()
i = i+1
def getvalue(self):
return '\n'.join(self.data)
@st.cache_resource
def get_foo():
bar = foo()
bar.daemon = True
add_script_run_ctx(bar)
bar.start()
return bar
bar = get_foo()
st.text_area('foo',value=bar.getvalue())
st.write(len(bar.data))
and the notify module looks like
import gc
from streamlit.runtime.scriptrunner import get_script_run_ctx
import threading
import asyncio
from streamlit.runtime import Runtime
from streamlit.runtime.app_session import AppSession
def get_browser_session_id() -> str:
# Get the session_id for the current running script
try:
ctx = get_script_run_ctx()
return ctx.session_id
except Exception as e:
raise Exception("Could not get browser session id") from e
def find_streamlit_main_loop() -> asyncio.BaseEventLoop:
loops = []
for obj in gc.get_objects():
try:
if isinstance(obj, asyncio.BaseEventLoop):
loops.append(obj)
except ReferenceError:
...
main_thread = next((t for t in threading.enumerate() if t.name == 'MainThread'), None)
if main_thread is None:
raise Exception("No main thread")
main_loop = next((lp for lp in loops if lp._thread_id == main_thread.ident), None) # type: ignore
if main_loop is None:
raise Exception("No event loop on 'MainThread'")
return main_loop
def get_streamlit_session(session_id: str) -> AppSession:
runtime: Runtime = Runtime.instance()
session = next((
s.session
for s in runtime._session_mgr.list_sessions()
if s.session.id == session_id
), None)
if session is None:
raise Exception(f"Streamlit session not found for {session_id}")
return session
def get_streamlit_sessions() -> list[AppSession]:
runtime: Runtime = Runtime.instance()
return [s.session for s in runtime._session_mgr.list_sessions()]
streamlit_loop = find_streamlit_main_loop()
streamlit_session = get_streamlit_session(get_browser_session_id())
def notify() -> None:
for session in get_streamlit_sessions():
session._handle_rerun_script_request()
Thanks for stopping by! We use cookies to help us understand how you interact with our website.
By clicking “Accept all”, you consent to our use of cookies. For more information, please see our privacy policy.
Cookie settings
Strictly necessary cookies
These cookies are necessary for the website to function and cannot be switched off. They are usually only set in response to actions made by you which amount to a request for services, such as setting your privacy preferences, logging in or filling in forms.
Performance cookies
These cookies allow us to count visits and traffic sources so we can measure and improve the performance of our site. They help us understand how visitors move around the site and which pages are most frequently visited.
Functional cookies
These cookies are used to record your choices and settings, maintain your preferences over time and recognize you when you return to our website. These cookies help us to personalize our content for you and remember your preferences.
Targeting cookies
These cookies may be deployed to our site by our advertising partners to build a profile of your interest and provide you with content that is relevant to you, including showing you relevant ads on other websites.