A little brief.
I have a Data Lake built using delta-lake residing on S3. For development, I have a recent and a toned down copy of the same on the local disk as well.
I am reading the delta-lake table as a pyarrow dataset (for lazy evaluation) in memory. Then on top of that I am using DuckDB to run SQL-Like queries. I am trying to add a functionality in my deployment where I can pass SQL queries from Streamlit on my datalake and a native Python Dataframe is returned as output to me.
Since we can consider streamlit as a python code on loop ? (too generalist but still). I do not want to redo the part where all my delta tables are loaded as pyarrow dataset in memory. That step is encapsulated within a python function and that function needs to be called only once.
The remaining functions, which use the duckdb connection persisted in memory to query the datalake shoud rerun on every click/change in the streamlit application.
load_data_lake(): This function loads the data and should be cached. Whenever I put the st.cache_* decorator, I get the error as shown in topic header.
Currently using the load_data_lake() behind a button as it limits the rerun until the button is pressed.
import pyarrow
import pandas as pd
import sys
import duckdb
from deltalake import DeltaTable, write_deltalake
import streamlit as st
conn = duckdb.connect(database=':memory:', read_only=False)
def load_data_lake(conn, *table_list):
for table_name in table_list:
print(f"Loading {table_name} in DuckDB")
dt = DeltaTable(f"/home/ubuntu/Analytics/datalake/bronze/{table_name}")
dataset = dt.to_pyarrow_dataset()
result = conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM dataset;")
def read_data_lake(conn, *table_list):
for table_name in table_list:
print(table_name)
result = conn.execute(f"SELECT count(1) FROM {table_name};")
print(result.fetchdf())
def run_query(conn, query):
result = conn.execute(query)
df = result.fetchdf()
st.write(df)
st.write(f"Rows returned :{df.shape[0]}")
table_list=["tableA", "tableB", "tableC"]
table_list_clean = ' '
for i in table_list:
table_list_clean += "- " + i + "\n"
st.title("Data Lake Querying Engine")
st.write("Tables Currently in DataLake Bronze Layer :")
st.markdown(table_list_clean)
query = st.text_area("Enter your SQL query here:", height=200)
if st.button("Execute", type="primary"):
load_data_lake(conn, *table_list)
run_query(conn, query)
conn.close()