Scheduling with Apache Airflow

Airflow at a glance

  • Apache Airflow is a workflow managment platform written in python
  • Workflows are developed also in python code, perfectly fit to send your « configuration as code » in a git repository.
  • You leverage python extra libraries to get enhancement in your pipelines, without the need of clunky langage like XML.


How does it work ?

  • Airflow use DAGs standing for Directed Acyclic Graph for worflows orchestration and their dependencies.
  • Worflows are read only rendered in a beautiful UI, pay attention that this is not a WYSIWYG tool.
  • It uses out of the box core operators (bash, python, email, …) and extended one for all existing providers (http, databases, docker, all cloud vendors like AWS-GCP-Azure, …).
  • Scalability is infinite using messaging, in this post we only introduce a local scheduling mode.


Quick demo

Launching Airflow

Even if the official repository provide a docker compose file, I fine tuned the file to simplify the getting started:

  • DAGs sample removing to get a brand new cleaned DAGs directory
  • Set the orchestrator to local, no clustering
  • As usual, add the traefik container and localtest.me domain for a ready to use DNS setup

Once ready, launch these commands

git clone https://github.com/jsminet/docker-apache-airflow.git
docker compose up -d

Open your favorite browser and go to the Airflow login page: http://airflow.localtest.me

Use airflow / airflow as credential and you should see this:


First DAG creation

As a prerequisite, create connection to postgres database by clicking on Admin then Connections


Postgres is part of this docker stack and I placed a script to automatically create a dedicated database for this demo, you can use Dbeaver for instance to test the connection, here are all the connection parameters:

UserPasswordDatabaseHostPort
data_sample_userdata_sample_passworddata_sampleairflow.localtest.me5432
Follow these setps for the postgres database setup

Once connection is done, open your favorite IDE and create a new python file in dags directory and use this code:

# Import datetime module
from datetime import datetime, timedelta
# Import DAG definition
from airflow import DAG
# Import bash operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# A simple python function
def greet(name):
    print(f"Hello {name} with python!")
    return name

# The DAG object
with DAG(
    "tutorial_dag",
    description="A simple DAG tutorial",
    default_args = {
        "owner":"JS",
        "retries": 1,
        "retry_delay": timedelta(minutes=5)
        },
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
) as dag:

# bash task1 definition object  
    task1 = BashOperator(
        task_id="bash_greeting",
        bash_command="echo 'Hello world with bash!'")
    
# python task2 definition object 
    task2 = PythonOperator(
        task_id="python_greeting",
        python_callable=greet,
        op_kwargs={"name":"JS"})
    
# postgres task3
    task3 = PostgresOperator(
        task_id="postgres_data_insertion",
        postgres_conn_id="postgres_connection",
        sql="""
        CREATE TABLE IF NOT EXISTS
                    tutorial(
                        id SERIAL PRIMARY KEY,
                        current_dag_run TIMESTAMP
                    );
        INSERT INTO tutorial
            (current_dag_run) VALUES ('{{ts}}')
        """)
    
# Calling the tasks
    task1 >> task2 >> task3

In this DAG, you’ll create three sequential and dependant tasks using three different operators:

  • Task 1 -> bash script using bash operator
  • Task 2 -> python script using python operator
  • Task 3 -> postgres data insertion using a connection using postgres DB operator

To create data into postgres, we will use out-of-the-box default variable you can find on template reference page and an auto increment id (keyword serial)

Launching the DAG

Click on DAG tab then on button Trigger DAG


After succesfull runs, click on Graph to see all the DAG embedded tasks

Click on each task green square to activate the Logs tab accordingly


The Logs tab is now available for all different tasks

What’s next

Only a few part of Airflow has been introduced, it’s a powerful scheduler that need others post to explore all the features and possibilities.

Here are some important topics about Apache Airflow you can find in the core concept

  • Cross communication using Xcoms
  • Making Airflow administration using the CLI
  • Managing taskflows using the Rest API
  • Using sensors to detect a incoming file on your file system like S3 for instance
  • How to make DAG debbuging