Apache airflow

I need to automatically store a list into my database. I convinced that the solution is apache airflow.
But when I test it it does not work.
Consider that my app requires username and paszsword.

I attach the code if somebody can help please?

This is my dag file:

from datetime import datetime, time, date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import streamlit as st
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import locale

engine = create_engine('postgresql+psycopg2://{}:%s@{}/{}'.format(
    st.secrets["postgres"]['dbname']) % quote_plus(st.secrets["postgres"]['password']))

conn = psycopg2.connect(**st.secrets["postgres"])
conn.autocommit = True

# Set locale
locale.setlocale(locale.LC_ALL, 'it_IT')

d = date.today()
A = d.weekday()
C = []
for i in range(7):
    C.append('{} - {}'.format(
        (d + datetime.timedelta(days=i-A)).strftime("%y-%m-%d"),
        (d + datetime.timedelta(days=i-A)).strftime("%A")

# Define your function to retrieve the list of employees and store it in the database
def retrieve_and_store_employees():
    # Your code to retrieve the list of employees and store it in the PostgreSQL database
    sql = """select * from v_db;"""
    cursor = conn.cursor()
    wecheck = cursor.fetchall()
    df = pd.DataFrame(wecheck)
    df[df.columns[3]] = df[df.columns[3]].astype('str')
    sql = """select * from Dash_User_up where stato=1"""
    cursor = conn.cursor()
    list_user = pd.DataFrame(cursor.fetchall())[1].tolist()
    list_ok = df[df[df.columns[3]].isin(['20'+c.split(' - ')[0] for c in C])][df.columns[1]].unique().tolist()
    miss = [i for i in list_user if i not in list_ok]
    data_miss = {'miss': miss, 'dat': [d.strftime("%y-%m-%d")] * len(miss)}
    missed = pd.DataFrame(data_miss)
    missed.to_sql('miss', engine, if_exists='append', index=True)

# Define the DAG
default_args = {
    'owner': 'Sina_Kian',
    'start_date': datetime(2023, 5, 22, hour=12, minute=13),
    'schedule_interval': '@weekly',  # Specify the schedule interval (e.g., every Sunday)

dag = DAG('my_dag', default_args=default_args, catchup=False)

# Define the task to execute your function
task_retrieve_store_employees = PythonOperator(

I add also this part in my main code:

import os
# Configure Airflow settings
os.environ['AIRFLOW_HOME'] = '/app/.streamlit/airflow'
# Set the AIRFLOW__CORE__LOAD_EXAMPLES environment variable to False
os.environ['AIRFLOW__CORE__LOAD_EXAMPLES'] = 'False'
import subprocess
# Start the Airflow scheduler in the background
subprocess.Popen(['airflow', 'scheduler'])

This topic was automatically closed 180 days after the last reply. New replies are no longer allowed.