Hi @tonykip ,
Thx for replying. Here is the code:
import { ReactNode } from “react”
import { StreamlitComponentBase, withStreamlitConnection, Streamlit } from “streamlit-component-lib”;
import DOMPurify from “dompurify”;
// import { string } from “prop-types”;
// Define the types for the component’s props and state
interface Args {
wsUrl: string;
query: string;
searchType: string;
searchMode: string;
searchInProgress: boolean;
userId: number;
authToken: string;
audience: string;
sessionId: string;
queryId: string;
}
interface State {
isConnected: boolean;
output: string;
prevQuery?: string;
prevSearchType?: string;
verbose: boolean;
heartbeatIntervalId?: number;
reconnectionAttempts: number;
lastActivityTime: number;
isMounted: boolean;
}
// The component extends StreamlitComponentBase with State and Args for the type parameters.
// Note the correct order: State first, then Args.
class WebSocketProxy extends StreamlitComponentBase<State, Args> {
state: State = {
isConnected: false,
output: “”,
prevQuery: this.props.args.query,
prevSearchType: this.props.args.searchType,
verbose: false,
reconnectionAttempts: 0,
lastActivityTime: Date.now(),
isMounted: false,
};
private socket?: WebSocket;
private heartbeatTimeoutId?: number;
private reconnectTimeoutId?: number;
componentDidMount() {
super.componentDidMount();
if (!this.state.isMounted) {
if (this.state.verbose) {
console.log(“WebSocket mounted”);
}
this.connectWebSocket();
this.setupHeartbeat();
this.setState({ isMounted: true });
}
}
componentWillUnmount() {
this.clearIntervals();
if (this.socket) {
this.socket.close();
this.socket = undefined;
}
this.setState({ isMounted: false });
}
componentDidUpdate() {
const { query, searchType, searchMode, searchInProgress, userId, sessionId, queryId } = this.props.args;
const { prevQuery, prevSearchType } = this.state;
const action = “start_research”;
if (!query) {
if (this.state.verbose) {
console.error("Query is empty. Exiting ...");
}
return;
}
if (query !== prevQuery || searchType !== prevSearchType) {
if (!this.state.isConnected) {
this.connectWebSocket();
}
if (this.socket && this.state.isConnected && searchInProgress) {
this.sendMessage(action, query, userId, searchType, searchMode, sessionId, queryId);
}
if (searchInProgress) {
this.setState({
prevQuery: query,
prevSearchType: searchType,
output: "",
});
}
}
}
connectWebSocket = () => {
const { wsUrl, authToken, audience } = this.props.args;
if (!wsUrl) {
console.error("WebSocket URL is not provided. Exiting ...");
return;
}
const wsUrlWithToken = `${wsUrl}?token=${encodeURIComponent(authToken)}&audience=${encodeURIComponent(audience)}`;
if (!wsUrlWithToken.startsWith("ws://") && !wsUrlWithToken.startsWith("wss://")) {
console.error("WebSocket URL must start with 'ws://' or 'wss://'. Exiting ...");
return;
}
this.socket = new WebSocket(wsUrlWithToken);
this.socket.onopen = () => {
this.setState({ isConnected: true, reconnectionAttempts: 0 });
console.log("WebSocket opened");
};
this.socket.onmessage = this.handleMessage;
this.socket.onclose = (event) => {
this.handleWebSocketClose(event);
};
this.socket.onerror = (error) => {
console.error("WebSocket error", error);
};
};
handleMessage = (event: MessageEvent) => {
if (this.state.verbose) {
console.log(“Received message from WebSocket”);
}
if (this.heartbeatTimeoutId) {
clearTimeout(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = undefined;
}
this.setState({ lastActivityTime: Date.now() });
const data = JSON.parse(event.data);
switch (data.type) {
case 'logs':
case 'search':
case 'report':
this.setState((prevState: State) => ({
output: `${prevState.output}\n${data.output}`,
}), () => {
this.setState({ output: DOMPurify.sanitize(this.state.output) });
Streamlit.setComponentValue(this.state.output);
});
break;
case 'error':
console.error(data.output);
this.socket?.close();
break;
case 'heartbeat':
if (this.state.verbose) {
console.log("Received heartbeat response from WebSocket");
}
break;
default:
console.log("Received unknown data type from WebSocket.");
}
};
handleWebSocketClose = (event: CloseEvent) => {
this.setState({ isConnected: false });
const { reconnectionAttempts } = this.state;
if (reconnectionAttempts < 24) {
const delay = Math.pow(2, reconnectionAttempts) * 1000;
this.reconnectTimeoutId = window.setTimeout(() => {
this.setState((prevState) => ({
reconnectionAttempts: prevState.reconnectionAttempts + 1
}));
console.log(WebSocket reconnection attempt ${reconnectionAttempts + 1}
);
this.connectWebSocket();
}, delay);
}
};
setupHeartbeat = () => {
const heartbeatIntervalId: number = window.setInterval(() => {
if (this.socket && this.state.isConnected) {
const currentTime = Date.now();
const timeSinceLastActivity = currentTime - this.state.lastActivityTime;
if (timeSinceLastActivity < 60000) {
return;
}
this.socket.send(JSON.stringify({ action: 'heartbeat', payload: 'ping' }));
if (this.heartbeatTimeoutId !== undefined) {
clearTimeout(this.heartbeatTimeoutId);
}
this.heartbeatTimeoutId = window.setTimeout(() => {
if (this.state.isConnected) {
this.socket?.close();
}
}, 30000);
}
}, 120000);
this.setState({ heartbeatIntervalId });
};
clearIntervals = () => {
if (this.state.heartbeatIntervalId !== undefined) {
clearInterval(this.state.heartbeatIntervalId);
}
if (this.heartbeatTimeoutId !== undefined) {
clearTimeout(this.heartbeatTimeoutId);
}
if (this.reconnectTimeoutId !== undefined) {
clearTimeout(this.reconnectTimeoutId);
}
};
sendMessage = (action: string, query: string, userId: number, searchType: string, searchMode: string, sessionId: string, queryId: string) => {
this.socket?.send(JSON.stringify({
action,
query,
userId,
searchType,
searchMode,
sessionId,
queryId,
}));
};
render = (): ReactNode => {
return null;
}
}
export default withStreamlitConnection(WebSocketProxy);
The python side is this:
import os
import streamlit.components.v1 as components
import streamlit as st
import uuid
import logging
logger = logging.getLogger(name)
_RELEASE = True
if not _RELEASE:
_component_func = components.declare_component(
“websocket-proxy”,
url=“http://localhost:3001”,
)
else:
parent_dir = os.path.dirname(os.path.abspath(file))
build_dir = os.path.join(parent_dir, “frontend/build”)
_component_func = components.declare_component(
“websocket-proxy”,
path=build_dir,
)
def websocket_proxy_widget(
ws_url,
query=“”,
search_type=“tiger_report”,
search_mode=“basic”,
search_in_progress=False,
user_id=None,
auth_token=None,
audience=None,
key=None,
):
session_id = st.query_params.get("session_id", None)
if query != st.session_state.get("websocket_query", None):
query_id = str(uuid.uuid4())
st.session_state.websocket_query_id = query_id
st.session_state.websocket_query = query
else:
query_id = st.session_state.get("websocket_query_id", None)
if user_id is None or session_id is None or query_id is None:
logger.error(
"User ID, Session ID, and Query ID are required for websocket connection"
)
return
component_value = _component_func(
wsUrl=ws_url,
query=query,
searchType=search_type,
searchMode=search_mode,
searchInProgress=search_in_progress,
userId=user_id,
authToken=auth_token,
audience=audience,
sessionId=session_id,
queryId=query_id,
key=key,
)
return component_value
"
I do not believe the issue is with the code, but rather how Streamlit updates components from 1.35 version and onwards (1.36). My component works perfectly fine in 1.34.
The behaviour in 1.34 (when works), when I establish the websocket connection from Streamlit to my backend service, it stays persistent while the prcessing is done. In the console I could see that the component was mounted only once and from the correct file (in this case WebSocketProxy.tsx). This is the output from this part:
"componentDidMount() {
super.componentDidMount();
if (!this.state.isMounted) {
if (this.state.verbose) {
console.log(“WebSocket mounted”);
}
this.connectWebSocket();
this.setupHeartbeat();
this.setState({ isMounted: true });
}
}
Now, what happens from the version 1.35 and later, is that the connection gets established and shows correctly that the mount has been done from “WebSocketProxy.tsx”. However, after a very short while it mounts again (or in parallel sometimes) showing it does so from a file called something like VM bundle.js or similar. Which then causes the first mount from WebSocketProxy.tsx tp unmount and close the connection, and then I all the work done from backend service (ie streaming) by that point is lost.
So, the difference between 1.34 and later versions is that the connection was persistent in 1.34 and onoy when the user navigated away from the streamlit page, it got closed, which is intended and by design. However, in 1.35 and 1.36 it seems to me as if Streamlit refreshes the component multiple times, and then it cuases repetitive mounts and dismount lifecycle events, and effectively causes to disrupt connection in my case.
Is there anything on Streamlit side, any configuration where I could influence on this behaviour (i.e component updates, or something to that avail)? I’ve read somwhere else that the component registry now is not a singleton but has been impemented differenty - not sure if it has anything to do with this. but just saying in case relevant.
Hope I managed to describe my issue clearly enough. Let me if any additional clarification is needed, or if there is some other way to get support regarding this.
This one is important to my app, as it effectively prevents me from upgrading from 1.34 and using the newest features.
Thx in advance.
Best, Mario