Setting up Airflow

Concepts

I prefer to use the official documentation of Airflow

Workflow

Jub Run

Setup - Official Version

(For the section on the Custom/Lightweight setup, scroll down)

Setup

Airflow Setup with Docker, through official guidelines

Tip

Custom Airflow with Dockerfile, You only need it once you complete default setting Build the image (only first-time, or when there’s any change in the Dockerfile, takes ~15 mins for the first-time)

docker-compose build

or (for legacy versions)

docker build .

You expected to have:


  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2024-06-08 16:02:18 +0000] [13] [INFO] Starting gunicorn 22.0.0
[2024-06-08 16:02:18 +0000] [13] [INFO] Listening at: http://[::]:8794 (13)
[2024-06-08 16:02:18 +0000] [13] [INFO] Using worker: sync
[2024-06-08 16:02:18 +0000] [14] [INFO] Booting worker with pid: 14
[2024-06-08 16:02:18 +0000] [15] [INFO] Booting worker with pid: 15

Execution

  1. Download docker-compose.yaml or official link here

  2. Initialize the Airflow scheduler, DB, and other config

docker-compose up airflow-init
  1. Kick up the all the services from the container:
docker-compose up
  1. In another terminal, run docker-compose ps to see which containers are up & running (there should be 7, matching with the services in your docker-compose file).

  2. Login to Airflow web UI on localhost:8080 with default creds: airflow/airflow

  3. Run your DAG on the Web Console.

  4. On finishing your run or to shut down the container/s:

docker-compose down

To stop and delete containers, delete volumes with database data, and download images, run:

docker-compose down --volumes --rmi all

or

docker-compose down --volumes --remove-orphans

Setup - Custom No-Frills Version (Lightweight)

This is a quick, simple & less memory-intensive setup of Airflow that works on a LocalExecutor.

Setup

Airflow Setup with Docker, customized

NofrillAirflowInstall.png

Execution

  1. Stop and delete containers, delete volumes with database data, & downloaded images (from the previous setup):
docker-compose down --volumes --rmi all

or

docker-compose down --volumes --remove-orphans

Or, if you need to clear your system of any pre-cached Docker issues:

docker system prune

Also, empty the airflow logs directory.

  1. Build the image (only first-time, or when there’s any change in the Dockerfile): Takes ~5-10 mins for the first-time
docker-compose build

or (for legacy versions)

docker build .
  1. Kick up the all the services from the container (no need to specially initialize):
docker-compose -f docker-compose-nofrills.yaml up
  1. In another terminal, run docker ps to see which containers are up & running (there should be 3, matching with the services in your docker-compose file).

  2. Login to Airflow web UI on localhost:8080 with creds: admin/admin (explicit creation of admin user was required)

  3. Run your DAG on the Web Console.

  4. On finishing your run or to shut down the container/s:

docker-compose down

Practice before Homework

Start the data_ingestion_postgres_dag

Example

Load data from api to postgres“

     from airflow import DAG
     from airflow.operators.python import PythonOperator
     from datetime import datetime
     from etl.web_log_pipeline import _ingesting_web, _create_table, _insert_data, _check_duplicate, _create_view

     dag = DAG(
     'data_ingestion_postgres_dag',
     description='DAG to load parquet file to postgres',
     schedule_interval='@once',
     start_date=datetime(2022, 1, 1),
     tags=['dec','dec-loading']
     )
     ingest_data = PythonOperator(
     task_id='ingest_data',
     python_callable= _ingesting_web,
     dag=dag,
     )
     create_table_task = PythonOperator(
     task_id='create_table',
     python_callable= _create_table,
     dag=dag,
     )
     insert_data_task = PythonOperator(
     task_id='insert_data',
     python_callable=_insert_data,
     dag=dag,
     )
     check_duplicate_task = PythonOperator(
     task_id='check_duplicate',
     python_callable= _check_duplicate,
     dag=dag,
     )
     create_view_task = PythonOperator(
     task_id='create_view',
     python_callable= _create_view,
     dag=dag,
     )
     ingest_data >> create_table_task >> insert_data_task >> check_duplicate_task >> create_view_task

Before Run

airflow=# select count(1) from web_logs;
 count
-------
     0
(1 row)

After Run

DAG Complete

airflow=# select count(1) from web_logs;
 count
-------
     9
(1 row)

airflow=# select count(1) from web_logs_view;
 count
-------
     9
(1 row)

Getting Sample data \dt select count(1) from web_logs

created-at        | first-name |   page-name   |        page-url        |        timestamp         |     user-name
--------------------------|------------|---------------|------------------------|--------------------------|-------------------
 2022-01-17T20:55:28.209Z | Graham     | janet.com     | https://audreanne.info | 2022-01-17T14:28:20.653Z | Khalil.Reichert54
 2022-01-17T17:11:16.749Z | Stacy      | christy.info  | https://edwin.com      | 2022-01-18T03:47:33.205Z | Patsy.Heathcote83
 2022-01-18T06:44:44.218Z | Darrell    | axel.biz      | http://tyra.info       | 2022-01-17T11:07:10.904Z | Kaleigh_Hirthe
 2022-01-17T12:58:41.937Z | Destin     | garland.com   | http://olaf.org        | 2022-01-18T05:28:12.185Z | Quinn.Anderson13
 2022-01-17T20:38:29.290Z | Carlotta   | letha.biz     | https://leonora.name   | 2022-01-17T15:47:45.831Z | Caroline.Rippin64
 2022-01-17T13:49:56.607Z | Chaya      | laney.biz     | https://dangelo.info   | 2022-01-18T07:11:14.812Z | Rafaela30
 2022-01-17T14:48:36.585Z | Camilla    | johnpaul.net  | http://lukas.com       | 2022-01-18T07:34:33.841Z | Valentina97
 2022-01-17T16:52:06.474Z | Sim        | dayne.name    | https://samir.info     | 2022-01-17T19:23:02.067Z | Erwin5
 2022-01-17T13:33:48.948Z | Carrie     | giovanni.name | http://victor.net      | 2022-01-17T14:09:30.009Z | Reagan_Towne
(9 rows)

Future Enhancements

  • Deploy self-hosted Airflow setup on Kubernetes cluster, or use a Managed Airflow on Cloud
  • Using other alternative tool like Mage as a single source of data processing and orchestration

!!! question

Did you finish your first DAG? OK!

You can go back to the top and start to Custom Dockerfile.

Plus: Try to use Astronomer for install Airflow for a few of commands.

astro dev init

astro dev start

Open: https://localhost:8080/

References

For more info, check out these official docs: