Live plot from a thread

Hi team
I have a message broker (in this case for MQTT messages) and wish to plot (for now just write) the latest value arriving on a particular topic. The actual subscription is happening on a thread, and I think this is causing an issue for streamlit as when I run the app I get the error Thread 'Thread-3': missing ReportContext.

Also I am not sure if using a while True to maintain the app is best practice?
Cheers

import paho.mqtt.client as mqtt

import streamlit as st
import time

MQTT_BROKER = 'localhost' 

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    st.write(f"Connected with result code {str(rc)} to MQTT broker on {MQTT_BROKER}")

def on_disconnect(client, userdata,rc=0):
    print("DisConnected result code "+str(rc))
    client.loop_stop()

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    st.write(str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER)
client.loop_start()
client.subscribe("streamlit")

while True:
    time.sleep(0.1)

An alternative approach would be to use another process to write the incoming data to a database (sqlite for example) then have a plot in streamlit which is being periodically updated

2 Likes

Hi @robmarkcole

To use Streamlit outside the main thread you need to grab some special objects and use them to write to the app inside the thread. This depends on some obscure Streamlit internal details, so I created a little wrapper to make your life easier: Hack to write to Streamlit app from another thread. See: https://discuss.streamlit.io/t/live-plot-from-a-thread/247 · GitHub

How to use it:

# This must be called from the main thread:
thread_safe_st = ThreadSafeSt()

def function_called_from_another_thread(arg1, arg2, st):
  st.main.markdown('arg1 is: %s' % arg1)
  st.sidebar.markdown('arg2 is: %s' % arg2)

# On another thread:
function_called_from_another_thread(10, 20, thread_safe_st)

Also I am not sure if using a while True to maintain the app is best practice?

Since you’re using mqtt, you should be able to replace the while True with just client.loop_forever(). See relevant docs here.

So I think the code would look like this in the end:
(Note: this is untested!)

import paho.mqtt.client as mqtt
import streamlit as st
import time
from thread_safe_st import ThreadSafeSt

MQTT_BROKER = 'localhost'

thread_safe_st = ThreadSafeSt()

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    # Note: st.write is not supported in thread_safe_st yet, but will be soon!
    # See https://github.com/streamlit/streamlit/pull/238
    thread_safe_st.markdown(
      f"Connected with result code {str(rc)} to MQTT broker on {MQTT_BROKER}")

def on_disconnect(client, userdata,rc=0):
    print("DisConnected result code "+str(rc))
    client.loop_stop()

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    thread_safe_st.text(str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER)
client.subscribe("streamlit")

client.loop_forever()
1 Like

@thiago many thanks for your advice on this. screenshot below :slight_smile:

My only remaining issue is that it is not possible to control+C to kill the app:

Oh, that’s annoying! I don’t know of a fix for that off-hand. We’ll have to investigate this.

But if you’re feeling 133t, you can try adding a sys.exit() inside the signal_handler function in Streamlit’s bootstrap.py to see if that works. If it does, we can probably land a fix very soon.

Otherwise, if you’re on Linux/Mac here’s a temporary hack to close an unresponsive Streamlit (and actually any unresponsive job in your terminal!):

  1. Press ctrl-z to pause Streamlit. This will give you access to the terminal again.
  2. Type kill -9 %%. The %% refers the the previously paused job (i.e. Streamlit)

Done!

One weird thing I noticed from your screeshot, though: why do those strings look like b'something something'? Are you using thread_safe_st.markdown() for those? If so, that looks like a bug…

Also: I create a feature request for the threading solution here. Thanks for bringing it up! Feel free to follow the FR on GitHub if you’re interested in updates

1 Like

Hi @thiago
I tried installing streamlit in develop mode (to append sys.exit()) but that resulted in an issue. Nevertheless kill -9 %% works.

I note a couple of strange things:

  1. The comment thread_safe_st.markdown( f"Connected with result code {str(rc)} to MQTT broker on {MQTT_BROKER}") is actually never displayed
  2. It is now all working without any threading and without errors…

import paho.mqtt.client as mqtt

import streamlit as st
import time

MQTT_BROKER = 'localhost'

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    st.write(f"Connected with result code {str(rc)} to MQTT broker on {MQTT_BROKER}")

def on_disconnect(client, userdata,rc=0):
    print("DisConnected result code "+str(rc))
    client.loop_stop()

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    st.write(msg.payload.decode())

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER)
client.subscribe("streamlit")

client.loop_forever()
  1. The comment thread_safe_st.markdown( f"Connected with result code {str(rc)} to MQTT broker on {MQTT_BROKER}") is actually never displayed

You’re right, I forgot to mention that was going to happen. “Magic” commands run on the main-thread st.

This is all making me think we should write docs on this subject… So thanks for hitting all the fine points of threading in one swoop! It will help a ton when we’re writing

  1. It is now all working without any threading and without errors…

Oh right! That’s because loop_forever() makes mqtt run on the main thread. So that’s a much better solution :+1:

1 Like

OK glad this has been helpful, and thanks for your advice!

1 Like

@thiago so using this I am now able to view a camera feed, over MQTT, using Streamlit. Sure beats messing around with flask :slight_smile:

5 Likes

This is awesome @robmarkcole :+1::+1:.

1 Like

I did it like above, but I can’t found the “thread_safe_st” module.
So I try to get the ReportContext from main thread, and set it into the sub thread.
Lastly, I solved it, too.

# in main thread
KEY_CONTEXT = streamlit.ReportThread.REPORT_CONTEXT_ATTR_NAME
main_context = getattr(current_thread(), KEY_CONTEXT)

# in sub thread
thread = current_thread()
if getattr(thread, KEY_CONTEXT, None) is None:
    setattr(thread, KEY_CONTEXT, main_context)
# here, I can use st normally
st.success('Connect successfully.')
2 Likes

Thanks for sharing !
This code works like a charm, even after 0.54 that breaks the previous solution

1 Like