If you have ever suffered with the error
set_page_config()
can only be called once per app page
you would have learned to wrap your Streamlit code inside functions, rather have them naked at the module level. However, if you define your fragment functions ahead of time like this, the run_every parameter is set when the module is imported, and you cannot change that value later on.
One workaround I found is: convert your fragment functions into inner functions. This will allow you to update the value of the fragment’s run_every parameter on every rerun of of your code.
The general structure looks something like this:
def _update_run_every(run_every=5):
if <off condition>:
# turn off run_every
run_every = None
if <some other condition>:
run_every = <something else>
return run_every
def do_something_complicated():
...
def main():
init()
layout()
@st.fragment(run_every=_update_run_every())
def _run_every_fragment():
# Because this is an inner function, the decorator
# will be processed again when Streamlit reruns
do_something_complicated()
_run_every_fragment()
if __name__ == "__main__":
main()
You can try this out on the tutorial Start and stop a streaming fragment, for example. Moving everything blindly into functions will break the example, and using the above technique fixes it.
One thing I am worried about is, whether this causes any trouble with the Streamlit workflow. Would Streamlit create a new fragment every time? Would this cause memory leaks if the users interact quickly with the app, such that a new fragment is created while the do_something_complicate()
function is still running?
Could someone from the Streamlit team please comment? Thank you!
My version of the Start-Stop tutorial
import streamlit as st
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def get_recent_data(last_timestamp):
"""Generate and return data from last timestamp to now, at most 60 seconds."""
now = datetime.now()
if now - last_timestamp > timedelta(seconds=60):
last_timestamp = now - timedelta(seconds=60)
sample_time = timedelta(seconds=0.5) # time between data points
next_timestamp = last_timestamp + sample_time
timestamps = np.arange(next_timestamp, now, sample_time)
sample_values = np.random.randn(len(timestamps), 2)
data = pd.DataFrame(sample_values, index=timestamps, columns=["A", "B"])
return data
def toggle_streaming():
st.session_state.stream = not st.session_state.stream
def init():
if "data" not in st.session_state:
st.session_state.data = get_recent_data(datetime.now() - timedelta(seconds=60))
if "stream" not in st.session_state:
st.session_state.stream = False
def layout():
st.title("Data feed")
st.sidebar.slider(
"Check for updates every: (seconds)", 0.5, 5.0, value=1.0, key="run_every"
)
st.sidebar.button(
"Start streaming", disabled=st.session_state.stream, on_click=toggle_streaming
)
st.sidebar.button(
"Stop streaming", disabled=not st.session_state.stream, on_click=toggle_streaming
)
def show_latest_data():
last_timestamp = st.session_state.data.index[-1]
st.session_state.data = pd.concat(
[st.session_state.data, get_recent_data(last_timestamp)]
)
st.session_state.data = st.session_state.data[-100:]
st.line_chart(st.session_state.data)
def _update_run_every():
if st.session_state.stream is True:
print("stream is true")
run_every = st.session_state.run_every
else:
run_every = None
return run_every
def main():
st.set_page_config(
page_title="Check Fragment Start Stop",
page_icon=":material/check_circle:",
)
init()
layout()
@st.fragment(run_every=_update_run_every())
def _show_lates_data_fragment():
show_latest_data()
_show_lates_data_fragment()
if __name__ == "__main__":
main()