Home  Database   Apache airf ...

Apache Airflow Use cases and Example

Apache Airflow is a platform to programmatically author, schedule, and monitor workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) of tasks. Airflow provides a robust suite of features for creating complex workflows, handling dependencies, scheduling tasks, and monitoring the execution of these tasks.

Use Case: ETL Pipeline

Let's consider a common use case for Airflow: building and orchestrating an ETL (Extract, Transform, Load) pipeline.

Scenario

You have data spread across multiple sources: databases, APIs, and log files. You need to extract this data, transform it into a suitable format, and load it into a data warehouse for analytics. You also want to ensure that the pipeline runs daily and handles failures gracefully.

Example: ETL Pipeline with Airflow

Step 1: Install Airflow

First, install Airflow using pip:

pip install apache-airflow

Step 2: Set Up Airflow

Initialize the Airflow database:

airflow db init

Create a user for the Airflow web interface:

airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email [email protected]

Start the Airflow web server and scheduler:

airflow webserver --port 8080
airflow scheduler

Access the Airflow web interface at http://localhost:8080.

Step 3: Define the DAG

Create a new Python file for your DAG in the dags directory. This DAG will extract data from an API, transform it, and load it into a PostgreSQL database.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
import requests
import pandas as pd

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'etl_pipeline',
    default_args=default_args,
    description='A simple ETL pipeline',
    schedule_interval=timedelta(days=1),
)

def extract():
    response = requests.get('https://api.example.com/data')
    data = response.json()
    df = pd.DataFrame(data)
    df.to_csv('/tmp/extracted_data.csv', index=False)

def transform():
    df = pd.read_csv('/tmp/extracted_data.csv')
    df['new_column'] = df['existing_column'] * 2  # Example transformation
    df.to_csv('/tmp/transformed_data.csv', index=False)

def load():
    df = pd.read_csv('/tmp/transformed_data.csv')
    pg_hook = PostgresHook(postgres_conn_id='your_postgres_conn_id')
    engine = pg_hook.get_sqlalchemy_engine()
    df.to_sql('your_table', engine, if_exists='replace', index=False)

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag,
)

extract_task >> transform_task >> load_task

Explanation

Step 4: Configure Connections

In the Airflow web interface, navigate to Admin -> Connections and set up the connection details for your PostgreSQL database. Create a connection with the conn_id specified in the DAG (your_postgres_conn_id).

Step 5: Run the DAG

The DAG is scheduled to run daily, but you can also trigger it manually from the Airflow web interface. To do so, navigate to the DAGs tab, locate etl_pipeline, and click the Trigger Dag button.

Monitoring and Managing

Published on: Jul 02, 2024, 09:24 AM  
 

Comments

Add your comment