I need to automatically store a list into my database. I convinced that the solution is apache airflow.
But when I test it it does not work.
Consider that my app requires username and paszsword.
I attach the code if somebody can help please?
This is my dag file:
from datetime import datetime, time, date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import streamlit as st
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import locale
engine = create_engine('postgresql+psycopg2://{}:%s@{}/{}'.format(
st.secrets["postgres"]['user'],
st.secrets["postgres"]['host'],
st.secrets["postgres"]['dbname']) % quote_plus(st.secrets["postgres"]['password']))
conn = psycopg2.connect(**st.secrets["postgres"])
conn.autocommit = True
# Set locale
locale.setlocale(locale.LC_ALL, 'it_IT')
d = date.today()
A = d.weekday()
C = []
for i in range(7):
C.append('{} - {}'.format(
(d + datetime.timedelta(days=i-A)).strftime("%y-%m-%d"),
(d + datetime.timedelta(days=i-A)).strftime("%A")
))
# Define your function to retrieve the list of employees and store it in the database
def retrieve_and_store_employees():
# Your code to retrieve the list of employees and store it in the PostgreSQL database
sql = """select * from v_db;"""
cursor = conn.cursor()
cursor.execute(sql)
wecheck = cursor.fetchall()
df = pd.DataFrame(wecheck)
df[df.columns[3]] = df[df.columns[3]].astype('str')
sql = """select * from Dash_User_up where stato=1"""
cursor = conn.cursor()
cursor.execute(sql)
list_user = pd.DataFrame(cursor.fetchall())[1].tolist()
list_ok = df[df[df.columns[3]].isin(['20'+c.split(' - ')[0] for c in C])][df.columns[1]].unique().tolist()
miss = [i for i in list_user if i not in list_ok]
data_miss = {'miss': miss, 'dat': [d.strftime("%y-%m-%d")] * len(miss)}
missed = pd.DataFrame(data_miss)
missed.to_sql('miss', engine, if_exists='append', index=True)
# Define the DAG
default_args = {
'owner': 'Sina_Kian',
'start_date': datetime(2023, 5, 22, hour=12, minute=13),
'schedule_interval': '@weekly', # Specify the schedule interval (e.g., every Sunday)
}
dag = DAG('my_dag', default_args=default_args, catchup=False)
# Define the task to execute your function
task_retrieve_store_employees = PythonOperator(
task_id='retrieve_and_store_employees',
python_callable=retrieve_and_store_employees,
dag=dag
)
I add also this part in my main code:
import os
# Configure Airflow settings
os.environ['AIRFLOW_HOME'] = '/app/.streamlit/airflow'
# Set the AIRFLOW__CORE__LOAD_EXAMPLES environment variable to False
os.environ['AIRFLOW__CORE__LOAD_EXAMPLES'] = 'False'
import subprocess
# Start the Airflow scheduler in the background
subprocess.Popen(['airflow', 'scheduler'])