Airflow at a glance
- Apache Airflow is a workflow managment platform written in python
data:image/s3,"s3://crabby-images/52db3/52db319b159d707f06d98096890949da834ca706" alt=""
- Workflows are developed also in python code, perfectly fit to send your « configuration as code » in a git repository.
- You leverage python extra libraries to get enhancement in your pipelines, without the need of clunky langage like XML.
How does it work ?
- Airflow use DAGs standing for Directed Acyclic Graph for worflows orchestration and their dependencies.
- Worflows are read only rendered in a beautiful UI, pay attention that this is not a WYSIWYG tool.
- It uses out of the box core operators (bash, python, email, …) and extended one for all existing providers (http, databases, docker, all cloud vendors like AWS-GCP-Azure, …).
- Scalability is infinite using messaging, in this post we only introduce a local scheduling mode.
Quick demo
Launching Airflow
Even if the official repository provide a docker compose file, I fine tuned the file to simplify the getting started:
- DAGs sample removing to get a brand new cleaned DAGs directory
- Set the orchestrator to local, no clustering
- As usual, add the traefik container and localtest.me domain for a ready to use DNS setup
Once ready, launch these commands
git clone https://github.com/jsminet/docker-apache-airflow.git
docker compose up -d
Open your favorite browser and go to the Airflow login page: http://airflow.localtest.me
Use airflow / airflow as credential and you should see this:
data:image/s3,"s3://crabby-images/373de/373de818aae28f17c8239e1f317d0ed305a733af" alt=""
First DAG creation
As a prerequisite, create connection to postgres database by clicking on Admin then Connections
data:image/s3,"s3://crabby-images/2100c/2100ccc2a0743c72f4a53862f696fc1d761ff512" alt=""
Postgres is part of this docker stack and I placed a script to automatically create a dedicated database for this demo, you can use Dbeaver for instance to test the connection, here are all the connection parameters:
User | Password | Database | Host | Port |
data_sample_user | data_sample_password | data_sample | airflow.localtest.me | 5432 |
data:image/s3,"s3://crabby-images/f97d7/f97d7f86c5e4cf8cb0cbb3d5f92146886f8a7396" alt=""
data:image/s3,"s3://crabby-images/87a7d/87a7db81e76aa08b307e597e8aac2f88b58e1a23" alt=""
data:image/s3,"s3://crabby-images/0ad9b/0ad9bf763d704d9da49a1fb1ac7c85070b8fa0f4" alt=""
Once connection is done, open your favorite IDE and create a new python file in dags directory and use this code:
# Import datetime module
from datetime import datetime, timedelta
# Import DAG definition
from airflow import DAG
# Import bash operator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# A simple python function
def greet(name):
print(f"Hello {name} with python!")
return name
# The DAG object
with DAG(
"tutorial_dag",
description="A simple DAG tutorial",
default_args = {
"owner":"JS",
"retries": 1,
"retry_delay": timedelta(minutes=5)
},
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
) as dag:
# bash task1 definition object
task1 = BashOperator(
task_id="bash_greeting",
bash_command="echo 'Hello world with bash!'")
# python task2 definition object
task2 = PythonOperator(
task_id="python_greeting",
python_callable=greet,
op_kwargs={"name":"JS"})
# postgres task3
task3 = PostgresOperator(
task_id="postgres_data_insertion",
postgres_conn_id="postgres_connection",
sql="""
CREATE TABLE IF NOT EXISTS
tutorial(
id SERIAL PRIMARY KEY,
current_dag_run TIMESTAMP
);
INSERT INTO tutorial
(current_dag_run) VALUES ('{{ts}}')
""")
# Calling the tasks
task1 >> task2 >> task3
In this DAG, you’ll create three sequential and dependant tasks using three different operators:
data:image/s3,"s3://crabby-images/27c46/27c4646156338a2580c0b53969d956e50efb090b" alt=""
- Task 1 -> bash script using bash operator
- Task 2 -> python script using python operator
- Task 3 -> postgres data insertion using a connection using postgres DB operator
To create data into postgres, we will use out-of-the-box default variable you can find on template reference page and an auto increment id (keyword serial)
Launching the DAG
Click on DAG tab then on button Trigger DAG
data:image/s3,"s3://crabby-images/58985/58985a0b6fa22635ac60e6d28460c74535e669cf" alt=""
After succesfull runs, click on Graph to see all the DAG embedded tasks
data:image/s3,"s3://crabby-images/9e8ab/9e8ab3d1a4ce454945c813dc4ba5458d047f52b2" alt=""
Click on each task green square to activate the Logs tab accordingly
data:image/s3,"s3://crabby-images/7923d/7923d9c6378294ce591366052be208e9c20b6d15" alt=""
The Logs tab is now available for all different tasks
data:image/s3,"s3://crabby-images/7b5e2/7b5e2f255c6e80c20225ce58338d20b29d8e2d2c" alt=""
data:image/s3,"s3://crabby-images/93ab0/93ab09e955e95c16cc5314bb53f52356ead07798" alt=""
data:image/s3,"s3://crabby-images/32bae/32baeedb6bc80f4f0d34e11aae8bce937123d0b0" alt=""
What’s next
Only a few part of Airflow has been introduced, it’s a powerful scheduler that need others post to explore all the features and possibilities.
Here are some important topics about Apache Airflow you can find in the core concept
- Cross communication using Xcoms
- Making Airflow administration using the CLI
- Managing taskflows using the Rest API
- Using sensors to detect a incoming file on your file system like S3 for instance
- How to make DAG debbuging
Commentaires récents