Streamlit with paho.mqtt - IS NOT POSSIBLE

My conclusion/experience after a week of trying to use paho.mqtt with streamlit is that it is not possible.

Some points that I could understand this week, client.loop_forever() runs in the same context as streamlit, as it has a while

The second point that I believe is the “problem” of it not working is because of on_message, since we need to add add_script_run_ctx to the parent thread, something that is not “possible”, not being possible, it gives a signal to streamlit to rerun.

Another point is that using paho as the main thread and running streamlit in its thread will “work”, but instead of updating a stack of widgets will be created.

Anyway, I’ll leave the code I made, but just so someone can come and do the opposite.

from threading import Thread
import streamlit as st
from streamlit.runtime.scriptrunner import add_script_run_ctx, get_script_run_ctx
from streamlit.runtime.scriptrunner.script_run_context import SCRIPT_RUN_CONTEXT_ATTR_NAME
import paho.mqtt.client as mqtt
import os
from dotenv import load_dotenv

class MyClient(mqtt.Client):

  def __init__(self) -> None:
    super().__init__("", None, None, 4, "tcp", True)
    self._init()

  def _init(self):
    load_dotenv()

    server = os.getenv("SERVER")
    port = os.getenv("PORT")

    self.on_message = self.my_on_message
    self.on_connect = self.my_on_connect
    self.username_pw_set(username=os.getenv("USER_MQTT"), password=os.getenv("PASSWORD"))
    self.subscribe(os.getenv("TOPIC"))

    self.connect(server, int(port))

  def my_on_message(self, client, userdata, msg):
    message = str(msg.payload)
    st.write(message)
      

  def my_on_connect(self, client, userdata, flags, rc):
      self.subscribe(os.getenv("TOPIC"))
  

if __name__ == "__main__":
  
  client = MyClient()
  client.loop_forever()
  """
  if 'status' not in st.session_state:
    print("CREATE CLIENT MQTT")
    
    client = MyClient()

    thread = Thread(target=client.loop_forever)
    add_script_run_ctx(thread=thread, ctx= get_script_run_ctx())
    thread.start()

    st.session_state['status'] = 'created'
  """

2 Likes

Fly-by comment, I’ve also been having some trouble here. For example:

"""This streamlit implementation is preferred over the gradio implementation"""

import json

import paho.mqtt.client as mqtt
import streamlit as st

# Initialize Streamlit app
st.title("Actuator Control Panel")

# MQTT Configuration
HIVEMQ_HOST = st.text_input("Enter your HiveMQ host:", "", type="password")
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "")
HIVEMQ_PASSWORD = st.text_input("Enter your HiveMQ password:", "", type="password")
PORT = st.number_input(
    "Enter the port number:", min_value=1, max_value=65535, value=8883
)

# User input for the Pico ID
pico_id = st.text_input("Enter your Pico ID:", "", type="password")

# Slider for position value
position = st.slider(
    "Select the position value:", min_value=1.1, max_value=1.9, value=1.5
)

print("hello")


# singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
@st.cache_resource
def get_paho_client(hostname, username, password=None, port=8883, tls=True):

    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)

    # The callback for when the client receives a CONNACK response from the server.
    def on_connect(client, userdata, flags, rc, properties=None):
        if rc != 0:
            print("Connected with result code " + str(rc))

    client.on_connect = on_connect

    # enable TLS for secure connection
    if tls:
        client.tls_set()
    # set username and password
    client.username_pw_set(username, password)
    # connect to HiveMQ Cloud on port 8883 (default for MQTT)
    client.connect(hostname, port)

    return client


def send_command(client, pico_id, position):
    # Topic
    command_topic = f"digital-pipette/picow/{pico_id}/L16-R"

    # Create and send command
    command = {"position": position}

    print("Publishing command")

    client.publish(command_topic, json.dumps(command), qos=1)

    return f"Command sent: {command} to topic {command_topic}"


# Publish button
if st.button("Send Command"):
    if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
        st.error("Please enter all required fields.")
    else:
        client = get_paho_client(
            HIVEMQ_HOST,
            HIVEMQ_USERNAME,
            password=HIVEMQ_PASSWORD,
            port=int(PORT),
            tls=True,
        )
        success_msg = send_command(client, pico_id, position)
        st.success(success_msg)

Seems that I can get it to work once, but the behavior has been unreliable/finnicky. Still debugging.

EDIT: The following seems to be working (forgot client.loop_start):

"""This streamlit implementation is preferred over the gradio implementation"""

import json

import paho.mqtt.client as mqtt
import streamlit as st

# Initialize Streamlit app
st.title("Actuator Control Panel")

# MQTT Configuration
HIVEMQ_HOST = st.text_input("Enter your HiveMQ host:", "", type="password")
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "")
HIVEMQ_PASSWORD = st.text_input("Enter your HiveMQ password:", "", type="password")
PORT = st.number_input(
    "Enter the port number:", min_value=1, max_value=65535, value=8883
)

# User input for the Pico ID
pico_id = st.text_input("Enter your Pico ID:", "", type="password")

# Slider for position value
position = st.slider(
    "Select the position value:", min_value=1.1, max_value=1.9, value=1.5
)

print("hello")


# singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
@st.cache_resource
def get_paho_client(hostname, username, password=None, port=8883, tls=True):

    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)

    # The callback for when the client receives a CONNACK response from the server.
    def on_connect(client, userdata, flags, rc, properties=None):
        if rc != 0:
            print("Connected with result code " + str(rc))

    client.on_connect = on_connect

    # enable TLS for secure connection
    if tls:
        client.tls_set()
    # set username and password
    client.username_pw_set(username, password)
    # connect to HiveMQ Cloud on port 8883 (default for MQTT)
    client.connect(hostname, port)
    client.loop_start()  # Use a non-blocking loop

    return client


def send_command(client, pico_id, position):
    # Topic
    command_topic = f"digital-pipette/picow/{pico_id}/L16-R"

    # Create and send command
    command = {"position": position}

    try:
        result = client.publish(command_topic, json.dumps(command), qos=1)
        result.wait_for_publish()  # Ensure the message is sent
        if result.rc == mqtt.MQTT_ERR_SUCCESS:
            return f"Command sent: {command} to topic {command_topic}"
        else:
            return f"Failed to send command: {result.rc}"
    except Exception as e:
        return f"An error occurred: {e}"


# Publish button
if st.button("Send Command"):
    if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
        st.error("Please enter all required fields.")
    else:
        client = get_paho_client(
            HIVEMQ_HOST,
            HIVEMQ_USERNAME,
            password=HIVEMQ_PASSWORD,
            port=int(PORT),
            tls=True,
        )
        success_msg = send_command(client, pico_id, position)
        st.success(success_msg)

As an update, the solution I have above seems to work OK for publishing multiple messages, but I haven’t been able to find a solution to “doesn’t work the second time I click the button” when I’m subscribed to and receiving messages in a queue.Queue(). When I try to put a Queue in st.session_state, I get an “uninitialized” error.