Apache Airflow Use cases and Example
Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) of tasks. Airflow provides a robust suite of features for creating complex workflows, handling dependencies, scheduling tasks, and monitoring the execution of these tasks.
Use Case: ETL Pipeline
Let's consider a common use case for Airflow: building and orchestrating an ETL (Extract, Transform, Load) pipeline.
Scenario
You have data spread across multiple sources: databases, APIs, and log files. You need to extract this data, transform it into a suitable format, and load it into a data warehouse for analytics. You also want to ensure that the pipeline runs daily and handles failures gracefully.
Example: ETL Pipeline with Airflow
Step 1: Install Airflow
First, install Airflow using pip:
pip install apache-airflow
Step 2: Set Up Airflow
Initialize the Airflow database:
airflow db init
Create a user for the Airflow web interface:
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected]
Start the Airflow web server and scheduler:
airflow webserver --port 8080
airflow scheduler
Access the Airflow web interface at http://localhost:8080
.
Step 3: Define the DAG
Create a new Python file for your DAG in the dags
directory. This DAG will extract data from an API, transform it, and load it into a PostgreSQL database.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
import requests
import pandas as pd
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'etl_pipeline',
default_args=default_args,
description='A simple ETL pipeline',
schedule_interval=timedelta(days=1),
)
def extract():
response = requests.get('https://api.example.com/data')
data = response.json()
df = pd.DataFrame(data)
df.to_csv('/tmp/extracted_data.csv', index=False)
def transform():
df = pd.read_csv('/tmp/extracted_data.csv')
df['new_column'] = df['existing_column'] * 2 # Example transformation
df.to_csv('/tmp/transformed_data.csv', index=False)
def load():
df = pd.read_csv('/tmp/transformed_data.csv')
pg_hook = PostgresHook(postgres_conn_id='your_postgres_conn_id')
engine = pg_hook.get_sqlalchemy_engine()
df.to_sql('your_table', engine, if_exists='replace', index=False)
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag,
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag,
)
extract_task >> transform_task >> load_task
Explanation
- DAG Definition: The DAG is defined with a default schedule interval of one day.
- Tasks: There are three tasks:
- Extract Task: Extracts data from an API and saves it to a CSV file.
- Transform Task: Transforms the extracted data and saves it to another CSV file.
- Load Task: Loads the transformed data into a PostgreSQL database.
- Task Dependencies: The tasks are set to run sequentially: extract, transform, and then load.
Step 4: Configure Connections
In the Airflow web interface, navigate to Admin -> Connections and set up the connection details for your PostgreSQL database. Create a connection with the conn_id
specified in the DAG (your_postgres_conn_id
).
Step 5: Run the DAG
The DAG is scheduled to run daily, but you can also trigger it manually from the Airflow web interface. To do so, navigate to the DAGs tab, locate etl_pipeline
, and click the Trigger Dag button.
Monitoring and Managing
- Web Interface: Use the Airflow web interface to monitor task statuses, logs, and execution times.
- Retries and Alerts: Airflow can automatically retry failed tasks and send alerts based on the configured parameters.