I am trying to build a streamlit app that is connected to SQL Server that allow user to filter dataframe based on SQL query, where the app allow the user to construct a dynamic query with different where conditions.
Based on @ asehmi suggestion i was able to create the app BUT
the query doesn’t contain flexible operators(and/or)for each field
the app look like this :
code:
import streamlit as st
import pandas as pd
#SQL pckgs
import pyodbc
Activities = ["EDA","Plot","About"]
choice = st.sidebar.selectbox("Select Activity",Activities)
radio = st.sidebar.radio(label="", options=["Connect To DB"])
st.header('Flexible Data Filtering UI')
df = pd.DataFrame()
if radio=="Connect To DB":
try:
con = pyodbc.connect(
driver="ODBC Driver 17 for SQL Server",
Server = "***.***.****.****",
DATABASE="info",
UID="******",
PWD="******",
)
cursor = con.cursor()
df = pd.read_sql_query('''select m.ID,
m.name,
m.nickname,
m.mother_name,
n.NewColumn as first_nationality,
from [info].[dbo].[person] m
left join [person].[dbo].[nationality] n on n.ID = m.nationality_1
''',con)
data=df
if data is not None:
#EDA Page
if choice =="EDA":
query_NA1 = df.first_nationality.unique()
with st.expander('Show UI Specification', expanded=False):
with st.echo():
UI_SPECIFICATION = [
{
'selector': {
'key': 'selector_name',
'type': st.checkbox,
'label': 'Name (required)',
# 'kwargs': {'value': True, 'disabled': True},
'kwargs': {'value': False},
},
'input': {
'key': 'input_name',
'type': st.text_input,
'dtype': str,
'label': 'Name',
'db_col': 'name',
'kwargs': {},
},
},
{
'selector': {
'key': 'selector_nickname',
'type': st.checkbox,
'label': 'Nickname',
'kwargs': {'value': False},
},
'input': {
'key': 'input_nickname',
'type': st.text_input,
'dtype': str,
'label': 'Nickname',
'db_col': 'nickname',
'kwargs': {},
},
},
{
'selector': {
'key': 'selector_mother_name',
'type': st.checkbox,
'label': 'mother name',
'kwargs': {'value': False},
},
'input': {
'key': 'input_mother_name',
'type': st.text_input,
'dtype': str,
'label': 'mother name',
'db_col': 'mother_name',
'kwargs': {},
},
},
{
'selector': {
'key': 'selector_nationality',
'type': st.checkbox,
'label': 'first_nationality',
'kwargs': {'value': False},
},
'input': {
'key': 'input_nationality',
'type': st.multiselect,
'dtype': list,
'label': 'Select nationality',
'db_col': 'first_nationality',
'kwargs': {'options': df.first_nationality.unique()},
},
},
]
def render_selectors(ui_spec):
field_cols = st.columns([1]*len(ui_spec))
for i, spec in enumerate(ui_spec):
selector = spec['selector']
with field_cols[i]:
selector['type'](selector['label'], key=selector['key'], **selector['kwargs'])
def get_selector_values(ui_spec):
values = {}
for spec in ui_spec:
selector = spec['selector']
values[selector['key']] = {
'label': selector['label'],
'value': st.session_state[selector['key']],
}
return values
def render_inputs(ui_spec, selector_values):
for spec in ui_spec:
input = spec['input']
selector_value = selector_values[spec['selector']['key']]['value']
if selector_value == True:
input['type'](input['label'], key=input['key'], **input['kwargs'])
def get_input_values(ui_spec, selector_values):
values = {}
for spec in ui_spec:
input = spec['input']
selector_value = selector_values[spec['selector']['key']]['value']
if selector_value == True:
values[input['key']] = {
'label': input['label'],
'db_col': input['db_col'],
'value': st.session_state[input['key']],
'dtype': input['dtype'],
}
return values
st.subheader('Filter fields selection')
render_selectors(UI_SPECIFICATION)
selector_values = get_selector_values(UI_SPECIFICATION)
st.subheader('Filter field inputs')
render_inputs(UI_SPECIFICATION, selector_values)
input_values = get_input_values(UI_SPECIFICATION, selector_values)
def build_query(input_values, logical_op, compare_op):
query_frags = []
for k, v in input_values.items():
if v['dtype'] == list:
query_frag_expanded = [f"{v['db_col']} {compare_op} '{val}'" for val in v['value']]
query_frag = f' {logical_op} '.join(query_frag_expanded)
elif v['dtype'] == int or v['dtype'] == float:
query_frag = f"{v['db_col']} {compare_op} {v['dtype'](v['value'])}"
elif v['dtype'] == str:
query_frag = f"{v['db_col']}.str.contains('{v['dtype'](v['value'])}')"
else:
query_frag = f"{v['db_col']} {compare_op} '{v['dtype'](v['value'])}')"
query_frags.append(query_frag)
query = f' {logical_op} '.join(query_frags)
return query
def display_results(df_results):
st.write(df_results)
st.subheader('Query builder')
def configure_query():
c1, c2, _ = st.columns([1,1,2])
with c1:
logical_op = st.selectbox('Logical operator', options=['and', 'or'], index=1)
with c2:
compare_op = st.selectbox('Comparator operator', options=['==', '>', '<', '<=', '>='], index=0)
return logical_op, compare_op
logical_op, compare_op = configure_query()
query = build_query(input_values, logical_op, compare_op)
if st.checkbox('Show filter query', True):
st.write(f'Query: `{query}`')
st.markdown('---')
if st.button("🔍 Apply filter query"):
df_results = df.query(query, engine='python')
st.subheader('Filtered data results')
display_results(df_results)