High frequency updates from websocket

Summary

I want to use streamlit to create a dashboard of all the trades (buy and sell) happening in a given market. I connect to a websocket stream to receive data of BTCUSDT from the Binance exchange. Messages are received every ~0.1s and I would like to update my dashboard in ~0.09s.
How can you handle this kind of situation where messages are delivered at high frequency? With my code, I successfully create a dashboard but it doesn’t get updated fast enough. I am wondering if the dashboard is running behind.

The dashboard must display the buy and sell volumes at any moment in time as bar charts. I am also adding some metrics to show the total volume of buy and sell, as well as their change.

Steps to reproduce

My code is structured in the following way.

There is a streamer.py file, that defines a class Streamer. The Streamer object is a Websocket client. It connects to a stream, handles messages, and updates the dashboard. Whenever a new message is received, Streamer acquires a threading.Lock() and updates the pandas dataframes (one dataframe for buy orders and one dataframe for sell orders). If there are multiple orders happening at the same timestamp, it combines them by summing the corresponding volumes. Then, it releases the threading.Lock() and it creates a new thread where the update function (defined in streamer.py) is executed. The update function acquires the lock to avoid messing up with memory.

In the main.py file, streamlit’s dashboard and the Streamerobject are initialized.

To reproduce the following code you need to connect to the Websocket from a region where Binance is not restricted. Since I live in the US, I must use a VPN to properly receive the data.
Code snippet:

main.py file

# main.py
import streamer
import pandas as pd
import streamlit as st  # web development
import numpy as np  # np mean, np random
import time  # to simulate a real time data, time loop
import plotly.express as px  # interactive charts

df_buy = pd.DataFrame(columns = [ 'Price', 'Quantity', 'USD Value'])
df_sell = pd.DataFrame(columns = [ 'Price', 'Quantity', 'USD Value'])

st.set_page_config(
    page_title='Real-Time Data Science Dashboard',
    page_icon='✅',
    layout='wide'
)

# dashboard title

st.title("Real-Time / Live Data Science Dashboard")

placeholder = st.empty()

streamer.Stream(df_buy,df_sell,placeholder).connect()

streamer.py file

# streamer.py
import websocket
import json
import streamlit as st
import plotly.express as px
import pandas as pd
from threading import Thread, Lock
from streamlit.script_run_context import add_script_run_ctx
from datetime import datetime
import time

def on_close(ws, close_status_code, close_msg):
    print('LOG', 'Closed orderbook client')


def update(df_buy,df_sell, placeholder, lock):
    lock.acquire()
    with placeholder.container():
        # create three columns
        kpi1, kpi2 = st.columns(2)
        current_sumSellVolumes = df_sell['Quantity'].sum()
        previous_sumSellVolumes = df_sell.iloc[:-1]['Quantity'].sum()
        current_sumBuyVolumes = df_buy['Quantity'].sum()
        previous_sumBuyVolumes = df_buy.iloc[:-1]['Quantity'].sum()
        # fill in those three columns with respective metrics or KPIs
        kpi2.metric(label="Sell quantity 📉", value=round(current_sumSellVolumes, 2),
                    delta=round(current_sumSellVolumes - previous_sumSellVolumes, 2))
        kpi1.metric(label="Buy quantity 📈", value=round(current_sumBuyVolumes, 2),
                    delta=round(current_sumBuyVolumes - previous_sumBuyVolumes, 2))

        # create two columns for charts

        fig_col1, fig_col2 = st.columns(2)
        with fig_col1:
            st.markdown("### Buy Volumes")
            fig = px.bar(data_frame=df_buy, x=df_buy.index, y='Quantity')
            st.write(fig)
        with fig_col2:
            st.markdown("### Sell Volumes")
            fig2 = px.bar(data_frame=df_sell, x=df_sell.index, y='Quantity')
            st.write(fig2)
        st.markdown("### Detailed Data View")
        st.dataframe(df_buy)
        st.dataframe(df_sell)
    lock.release()


class Stream():
    def __init__(self, df_buy, df_sell, placeholder):
        self.symbol = 'BTCUSDT'
        self.df_buy = df_buy
        self.df_sell = df_sell
        self.placeholder = placeholder
        self.lock = Lock()
        self.url = "wss://stream.binance.com:9443/ws"
        self.stream = f"{self.symbol.lower()}@aggTrade"
        self.times = []

    def on_error(self, ws, error):
        print(self.times)
        print('ERROR', error)

    def on_open(self, ws):
        print('LOG', f'Opening WebSocket stream for {self.symbol}')

        subscribe_message = {"method": "SUBSCRIBE",
                             "params": [self.stream],
                             "id": 1}
        ws.send(json.dumps(subscribe_message))

    def handle_message(self, message):
        self.lock.acquire()
        timestamp = datetime.utcfromtimestamp(int(message['T']) / 1000)
        price = float(message['p'])
        qty = float(message['q'])
        USDvalue = price * qty
        side = 'BUY' if message['m'] == False else 'SELL'
        if side == 'BUY':
            df = self.df_buy
        else:
            df = self.df_sell
        if timestamp not in df.index:
            df.loc[timestamp] = [price, qty, USDvalue]
        else:
            df.loc[df.index == timestamp, 'Quantity'] += qty
            df.loc[df.index == timestamp, 'USD Value'] += USDvalue
        self.lock.release()

    def on_message(self, ws, message):
        message = json.loads(message)
        self.times.append(time.time())
        if 'e' in message:
            self.handle_message(message)

            thr = Thread(target=update, args=(self.df_buy, self.df_sell, self.placeholder, self.lock,))
            add_script_run_ctx(thr)
            thr.start()


    def connect(self):
        print('LOG', 'Connecting to websocket')
        self.ws = websocket.WebSocketApp(self.url, on_close=on_close, on_error=self.on_error,
                                         on_open=self.on_open, on_message=self.on_message)
        self.ws.run_forever()


Debug info

  • Streamlit version: 1.4.0
  • Python version: 3.10.4
  • OS version: MacOS 13.1
  • Browser version: Safari 16.2
4 Likes

Same problem here did you get any chance to fix it?

This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.