[Need Help] Listening to Real Time Updates From Firestore

I followed the helpful blogpost here: https://blog.streamlit.io/streamlit-firestore-continued/ by the awesome @AustinC! However, I am not certain on how to store updates from firestore snapshots into my streamlit session state. Any tips?

import threading
def initialize_session_state():
    if 'field' not in st.session_state:
             st.session_state.field = ''

initialize_session_state()
callback_done = threading.Event()
# Create a callback on_snapshot function to capture changes
def on_snapshot(doc_snapshot, changes, read_time):
    for doc in doc_snapshot:
        print(f'Received document snapshot: {doc.id}')
        print(f'Received document content: {doc.to_dict()["field"]}')
      
        db_dict =  doc.to_dict()

        load_from_db(db_dict, True)


    callback_done.set()

def load_from_db (_doc, from_snapshot = False):

    if from_snapshot:
        db_dict =  _doc

        st.session_state.field= db_dict['field']


# Watch a specific document
doc_watch = db.collection("collection_name").document('document_id').on_snapshot(on_snapshot)

When running something like this – on_snapshot throws an error saying that I did not initialize field in the session state. Do I need to pass the session state into on_snapshot? If so how do I do that?

1 Like

Hi there @cheesy!

Did you get this firestore listener to work? If so, how did you manage to store updates from the snapshot to the Streamlit session state?

Thanks in advance, also if anyone in the community got this working it would be amazing to know how :pray:

I, unfortunately, did not get it to work. I decided to create a ReactJS Streamlit component and moved the Firestore logic from Python into ReactJS. If anyone has any alternative solution let us know!

2 Likes

Hi again @cheesy ,

Thanks anyways for the reply.

See below a work-in-progress (WIP) example, but it still has 3 important problems (two of them are also highlighted with a box inside the example code). Sharing in case we can work together on figuring this one out :slight_smile:.

@andfanilo : Saw your awesome “Streamlit + Firestore AMA app tutorial” on YouTube, so wanted to ask you if you had any tips (or know any Streamlit creator that could help us). Thanks a lot in advance!!!

Problems:

  1. Unsubscribing the firestore listener with doc_watch.unsubscribe() doesn’t work, I get this weird log and when check existing threads it is still alive:

    2022-10-28 10:00:51.013 RPC termination has signaled manager shutdown.
    2022-10-28 10:00:52.025 Background thread did not exit.
    2022-10-28 10:00:52.025 Thread-ConsumeBidirectionalStream exiting

  2. I don’t know how to end the whole “background thread + listener + event” combo properly if, for example, no change is detected but the user closes the browser. It seems things remain alive in the server, which is not ideal.

  3. Forcing a rerun as soon as change is detected doesn’t work.

  4. Bonus: Haven’t put enough thought into this one yet, but what would happen if change is detected and the app is in the middle of some calculation? Probably there needs to be some priority/guardrails (e.g., Lock?) set between the background thread and the Streamlit app session’s main thread.

Example code (WIP):

import streamlit as st

st.title('WIP: Streamlit app + Firestore real-time listener')


if 'change_detected' not in st.session_state:


    # Flag to know inside the Streamlit app session if change was detected or not yet.
    st.session_state.change_detected = False


    # Declare name of document to listen to, also name of collection it belongs to.
    col_name = 'example_collection'
    doc_name = 'example_document'


    # Create threading.Event that will make Thread wait at "callback_done.wait()" until condition is met.
    import threading
    callback_done = threading.Event()


    # Put the firestore listener + Event in a target function so they can run inside a background Thread in a Streamlit app.
    def listener_thread_target(col_name:str, doc_name:str):


        # Snapshot function that goes inside firestore listener, which listens for any change in document and does "callback_done.set()" when a change is detected.
        def on_snapshot(doc_snapshot, changes, read_time):
            if changes[0].type.name == 'MODIFIED':
                callback_done.set()


        # Initialize firestore database connection.
        from google.cloud import firestore
        firestore_key = 'YOUR FIRESTORE KEY GOES HERE'
        db = firestore.Client.from_service_account_info(firestore_key)


        # Create the firestore listener for the given document name (doc_name). 
        doc_watch = db.collection(col_name).document(doc_name).on_snapshot(on_snapshot)

        # Background thread is forced to wait here until the condition inside listener is met and "callback_done.set()" happens.
        callback_done.wait()

        # From here until the end of listener_thread_target, everything the background Thread will do once "callback_done.set()" happens:

        # Set to True the session state variable that tracks change.
        st.session_state.change_detected = True

        ######################################################################
        #
        # NOT WORKING - unsubscribe the firestore listener so it stops running.
        doc_watch.unsubscribe()        
        #
        # NOT WORKING - force rerun to see the change without having to refresh app manually.
        # st.experimental_rerun()
        #
        ######################################################################

    # Create a background Thread that has the listener_thread_target as target.
    from streamlit.runtime.scriptrunner import add_script_run_ctx
    listener_thread = threading.Thread(target=listener_thread_target,args=(col_name,doc_name))

    # Add the session context to the background Thread.
    add_script_run_ctx(listener_thread)

    # Start the background Thread.
    listener_thread.start()


# See in the app if the change was detected or not.
st.write(st.session_state.change_detected)

# Button to refresh app manually.
st.button('Naive refresh button')
2 Likes

Hello @marduk! Have you been able to make it work?

Hey there @elloza ,

Forgot to reply - unfortunately not, so moved on to other things. Did you? Curious to see if anyone found a solution.

It may be a little late, but this post shows how to acomplish real-time updates in streamlit. It seems that using a python queue.Queue is doing the trick

2 Likes

don’t know it fits your needs. But as a pseudo-code i did the following

from streamlit.runtime.state import SessionStateProxy

class SessionStateFS(SessionStateProxy):
    def __setitem__(self, key: Key, value: Any) -> None:
        """Set the value of the given key.

        Raises
        ------
        StreamlitAPIException
            If the key is not a valid SessionState user key.
        """
        doc_ref = get_firestore_doc_ref()
        doc_ref.update({key: value})
        super().__setitem__(key, value)


session_state_fs = SessionStateFS()
1 Like

This topic was automatically closed 2 days after the last reply. New replies are no longer allowed.

Thanks for sharing @harisbal . I was working on a similar video and while I’m not a fan of involving threading in Streamlit and anything using while True, we don’t really have much choice since we have no control on the background thread of the callback to reinject it into Streamlit context…so I came to the same Queue conclusion as this blog post. I’ll mark it as an answer :slight_smile: