I have a functioning code that records sensor data to a csv file, however, there is a problem with the session states in which, the end data collection button is not appearing, so the sensor data just continuously writes to the csv file. How do I include an end data collection button, in order to stop the recording of the sensor, and saves the csv file?
import USB_ExampleClassStreamlit
from ThreeSpaceAPIStreamlit import *
from time import time_ns, sleep
import sys
import csv
import os
import winsound
from colorama import init, Fore, Style
import streamlit as st
import re
init(autoreset=True) # Initialize colorama to auto-reset colors after each print statement
# Streamlit app header and instructions
st.markdown("<h1 style = 'text-align: center; color: #001e69;'>Welcome to the LETREP25 Project!</h1>", unsafe_allow_html=True)
st.subheader("Please login with your username and password below")
st.markdown("")
# Simulate authentication
def authenticate(username, password):
return username == "letrep" and password == "letrep123"
# Create input fields for username and password
username = st.text_input("Username")
password = st.text_input("Password", type="password")
# Initialize session state for logged_in
if 'logged_in' not in st.session_state:
st.session_state.logged_in = False
# Initialize session state for data collection
if 'data_collection_active' not in st.session_state:
st.session_state.data_collection_active = False
# Create a button to trigger the authentication process
if st.button("Login"):
if authenticate(username, password):
st.session_state.logged_in = True
st.success("Logged in successfully!")
else:
st.session_state.logged_in = False
st.error("Invalid username or password")
# Initialize constants and globals
MICROSECONDS_IN_A_SECOND = 1000000
PORT = None # Autodetect the port, modify if necessary
TIMEOUT = 0.7
LOGICAL_IDS = [0, 1] # Assuming two sensors
DESIRED_SAMPLES_PER_SECOND = 100
# Initialize control variables
recording = False # State variable to track recording status
csvfile_0 = None
csvfile_1 = None
writer_0 = None
writer_1 = None
iteration = 0
# Function to find the next iteration number
def find_next_iteration_number():
pattern = re.compile(r'IMU\d{2}_Duration_Collection_(\d{2})\.csv')
highest_iteration = 0
for filename in os.listdir('.'):
match = pattern.match(filename)
if match:
iteration_number = int(match.group(1))
if iteration_number > highest_iteration:
highest_iteration = iteration_number
return highest_iteration + 1
# Function to start recording
def start_recording(patient_name, date):
global recording, csvfile_0, csvfile_1, writer_0, writer_1, iteration
if not recording:
winsound.Beep(400, 50) # Beep at 400 Hz for 50 milliseconds
winsound.Beep(800, 70) # Beep at 800 Hz for 70 milliseconds
recording = True
iteration = find_next_iteration_number()
fileName = str(patient_name + date)
# Open new CSV files for writing
csvfile_0 = open(f'IMU00_{fileName}.csv', 'w', newline='')
csvfile_1 = open(f'IMU01_{fileName}.csv', 'w', newline='')
# Create new CSV writers
writer_0 = csv.writer(csvfile_0)
writer_1 = csv.writer(csvfile_1)
print(Fore.CYAN + f"Recording started: Session {iteration}")
# Function to stop recording
def stop_recording():
global recording, csvfile_0, csvfile_1, writer_0, writer_1
if recording:
winsound.Beep(700, 50) # Beep at 700 Hz for 50 milliseconds
winsound.Beep(300, 70) # Beep at 300 Hz for 70 milliseconds
recording = False
if csvfile_0 is not None:
csvfile_0.close()
csvfile_0 = None # Reset file object to None
if csvfile_1 is not None:
csvfile_1.close()
csvfile_1 = None # Reset file object to None
writer_0 = None
writer_1 = None
print(Fore.RED + "Recording stopped")
# Function to record data
def record_data(senTSS):
for id in LOGICAL_IDS:
packet = senTSS.getOldestStreamingPacket(logicalID=id)
if packet is not None and writer_0 is not None and writer_1 is not None:
if id == 0:
writer_0.writerow(packet)
elif id == 1:
writer_1.writerow(packet)
# Streamlit login logic
if st.session_state.logged_in:
st.write("Welcome, you are logged in!")
patient_name = st.text_input("Please enter your name:")
date = st.text_input("Please enter the date (e.g., 11_7_24):")
st.subheader("Please select start if you are ready to begin data collection")
# Setup for the sensor
if st.button("Start Data Collection") and patient_name and date:
st.session_state.data_collection_active = True
senCom = USB_ExampleClassStreamlit.UsbCom(PORT, timeout=TIMEOUT)
senTSS = None
try:
st.write("Initializing sensor...")
senTSS = ThreeSpaceSensor(senCom, streamingBufferLen=1000)
st.write("Sensor initialized successfully")
except Exception as e:
st.error(f"Error initializing sensor: {str(e)}")
st.error("Make sure the sensor is connected to the correct COM port.")
st.error("If the sensor is powered off, turn it on and try again.")
if senTSS:
start_recording(patient_name, date)
st.write("Starting data stream...")
# Set up streaming and collection
senTSS.comClass.sensor.reset_input_buffer()
for id in LOGICAL_IDS:
senTSS.startStreaming(logicalID=id)
# Collect data in the background
while st.session_state.data_collection_active:
record_data(senTSS)
sleep(0.01) # Adjust this based on your desired sample rate
# Always show "End Data Collection" button when data collection is active
if st.session_state.data_collection_active:
if st.button("End Data Collection"):
stop_recording()
senTSS.stopStreaming(logicalID=0)
senTSS.stopStreaming(logicalID=1)
st.session_state.data_collection_active = False
st.success("Data collection stopped. Files saved.")