Big Data 7 min read

Apache Airflow Overview and Advanced Usage Examples

This article introduces Apache Airflow, explains its core concepts such as DAGs, tasks, operators, executors, and the web UI, and provides multiple practical Python code examples for Bash commands, Python functions, SQL queries, task dependencies, sensors, dynamic DAGs, SubDAGs, XCom, email alerts, and error handling.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Apache Airflow Overview and Advanced Usage Examples

Apache Airflow is a popular open‑source workflow management system used to create, schedule, and monitor batch jobs and data pipelines. Users define task dependencies by writing DAGs (Directed Acyclic Graphs) and can extend functionality with a rich plugin ecosystem for jobs such as Spark, Hive, and FTP/SFTP.

Core components : A DAG is the central workflow unit composed of Tasks, which are the smallest executable units and can be any operator (e.g., BashOperator, PythonOperator, Sensor). Operators are abstract classes that encapsulate specific logic. Executors run tasks (SequentialExecutor, LocalExecutor, CeleryExecutor, etc.). The Scheduler triggers tasks based on DAG dependencies, and the Web UI provides visual monitoring and management.

Executing a Bash command :

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 27),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG('example_bash_dag', default_args=default_args, schedule_interval=timedelta(hours=1)) as dag:
    t1 = BashOperator(
        task_id='run_command',
        bash_command='echo "Hello World"',
    )
    # Running this DAG repeatedly executes the echo command at the defined interval

Calling a Python function :

from airflow.decorators import task
from airflow.utils.dates import days_ago

@task
def print_hello():
    print("Hello from PythonOperator!")

with DAG('example_python_dag', schedule_interval=timedelta(hours=1), start_date=days_ago(1)) as dag:
    say_hello = print_hello()
    # Executes the custom Python function

Scheduled SQL query :

from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.models import Variable

def run_sql():
    sql_query = Variable.get('my_sql_query')
    return sql_query

with DAG('example_mysql_dag', ...) as dag:
    execute_query = MySqlOperator(
        task_id='run_mysql_query',
        mysql_conn_id='your_mysql_connection',
        sql=run_sql(),
    )

Task dependencies (multiple upstream tasks) :

from airflow.operators.python_operator import PythonOperator

def print_message(message):
    print(message)

with DAG(...) as dag:
    t1 = PythonOperator(task_id='print_a', python_callable=print_message, op_kwargs={'message': 'Task A'})
    t2 = PythonOperator(task_id='print_b', python_callable=print_message, op_kwargs={'message': 'Task B'})
    t3 = PythonOperator(task_id='print_c', python_callable=print_message, op_kwargs={'message': 'Task C'})
    # t3 runs only after t1 and t2 complete
    t3.set_upstream([t1, t2])

External event sensor (HttpSensor) :

from airflow.sensors.http_sensor import HttpSensor

with DAG(...) as dag:
    wait_for_data = HttpSensor(
        task_id='wait_for_http_response',
        http_conn_id='http_default',
        endpoint='/api/data',
        poke_interval=60,
        timeout=600,
        mode="reschedule",
        success_states=["200"],
    )
    process_data = PythonOperator(...)
    # process_data depends on successful HttpSensor
    process_data.set_upstream(wait_for_data)

Dynamic DAG construction :

from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DagRun

def create_dag_structure(dag):
    for i in range(5):
        task_id = f'task_{i}'
        DummyOperator(task_id=task_id, dag=dag)

dag = DAG('dynamic_dag_example', ...)
create_dag_structure(dag)

Cross‑DAG (SubDag) reference :

from airflow import models
from airflow.operators.subdag_operator import SubDagOperator
subdag = DAG(...)
subtask = BashOperator(..., dag=subdag)
main_dag = DAG(...)
main_task = SubDagOperator(subdag=subdag, task_id='subdag_task', dag=main_dag)

Using XCom to pass data :

from airflow.operators.python_operator import PythonOperator
from airflow.models import XCom

def push_value(ti, **kwargs):
    ti.xcom_push(key='value', value='some data')

def pull_value(**kwargs):
    last_value = kwargs['ti'].xcom_pull(task_ids='push_task', key='value')
    print(f"Pulled value: {last_value}")

with DAG(...) as dag:
    push_task = PythonOperator(task_id='push_task', python_callable=push_value)
    pull_task = PythonOperator(task_id='pull_task', python_callable=pull_value)
    pull_task.set_upstream(push_task)

Email notification :

from airflow.operators.email_operator import EmailOperator
notify_email = "[email protected]"

with DAG(...) as dag:
    email_alert = EmailOperator(
        task_id='send_email',
        to=notify_email,
        subject='Airflow Alert',
        html_content='Email from Airflow',
    )

Error handling and retries :

from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowSkipException

def potentially_failing_function():
    import random
    if random.randint(0, 10) > 5:
        raise AirflowSkipException("Randomly skipped the task")
    else:
        print("Task executed successfully")

with DAG(...) as dag:
    failing_task = PythonOperator(
        task_id='potentially_failing_task',
        python_callable=potentially_failing_function,
        retries=3,  # number of retries
        retry_delay=timedelta(minutes=5),
    )

Note : When using these examples, ensure your Airflow environment is correctly configured and all required connectors (e.g., MySQL) are installed. The snippets omit some imports and full DAG context; adapt them to your specific project requirements.

DAGworkflowschedulingdata pipelinesApache Airflow
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.