airflow(에어플로우)

airflow 기초

데이터왕 2023. 12. 31. 17:06

airflow 

  • airbnb 에서 시작한 아파치 오픈소스 프로젝트
  • 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임워크
  • 정해진 시간에 dag(ETL) 실행
  • 웹 UI도 제공(스케줄러와 dag의 실행 상황을 시각화해줌)
  • (핵심기능) backfill을 단순화 해줌
    + backfill : 데이터 세트의 누락 또는 불완전한 데이터를 채우는 프로세스

airflow 구조

  1. 웹서버 (Web Server):
       - Airflow 웹서버는 Airflow UI를 제공하며, DAGs(Directed Acyclic Graphs)를 시각적으로 관리하고 모니터링하는 데 사용됩니다. 웹 서버는 사용자 인터페이스를 통해 DAG 실행, 로그 확인, 스케줄러 상태 확인 등을 제공합니다.

  2. 스케줄러 (Scheduler):
       - Airflow 스케줄러는 정의된 DAG 실행 일정에 따라 작업을 예약하고 관리합니다. 주어진 DAG의 실행을 트리거하고 작업 간의 종속성을 관리하여 정의된 일정에 따라 작업을 실행합니다.

  3. 워커 (Worker):
       - Airflow 워커는 스케줄러에 의해 예약된 작업을 실행하는 역할을 합니다. 여러 워커가 병렬로 작업을 처리하여 시스템의 확장성을 지원합니다.

  4. 메타데이터 데이터베이스 (Metadata Database):
       - Airflow는 메타데이터 데이터베이스를 사용하여 실행된 DAG, 작업 실행 로그, 작업 상태 등의 메타데이터를 저장합니다. 이 메타데이터는 스케줄링, 모니터링 및 이력 추적을 위해 사용됩니다. 기본으로 sqlite가 설치된다. 그러나 실제로는 mysql,postgres를 설치해 사용한다.

  5. 큐 (Queue):
       - 다수서버 구성인 경우에만 사용된다. Airflow는 메시지 큐를 사용하여 작업 간의 메시지 전달을 처리합니다. 큐를 통해 스케줄러는 워커에 작업을 할당하고, 워커는 큐를 통해 스케줄러에 작업 완료 상태를 보고합니다.


실무에서 airflow 사용시 좋은 사양이 필요해진다면

  • 서버 1개 사용시 : 서버의 사양을 높여나간다.
  • 2개 이상 사용시 : 사양 증가의 한계치가 오면 서버를 늘린다. 이때 airflow 제공해주는 클라우드로 가는 것이 경제적 
    + 서버를 늘린다 : 보통 스케줄러 웹서버는 두고, worker용으로 서버를 늘린다. 

 

 

airflow를 다수 서버로 구성할때

  1. 스케줄러 (Scheduler):
       - Airflow 스케줄러는 정의된 DAG(Directed Acyclic Graph)의 실행을 예약하고 관리하는 주체입니다. 스케줄러는 DAG 실행의 스케줄링과 종속성 그래프를 고려하여 작업을 실행합니다.

  2. 실행자 (Executor):
       - 실행자는 스케줄러에 의해 예약된 작업을 실행하는 역할을 합니다. Airflow에서는 여러 가지 종류의 실행자가 제공되며, 이들은 작업을 실행하는 방식에 대한 다른 전략을 제공합니다. 예를 들어, LocalExecutor는 단일 머신에서 작업을 실행하고, CeleryExecutor는 분산된 환경에서 작업을 실행합니다.
    +일부 Executor의 경우 동작 특성에 따라 큐가 필요없이 단독으로 동작이 가능합니다.

  3. 큐 (Queue):
       - Airflow에서는 메시지 큐를 사용하여 작업 간의 통신을 조정합니다. 작업이 실행되기 전에 스케줄러는 해당 작업을 큐에 넣고, 실행자는 큐에서 작업을 가져와 실행합니다. 큐는 스케줄러와 실행자 간의 효율적인 통신을 도와줍니다.

  4. 스케줄러와 실행자 간의 관계:
    - 스케줄러는 DAG 실행을 예약하고 해당 작업을 큐에 추가합니다.
    - 실행자는 큐에서 작업을 가져와 실행합니다. 실행자는 실행 전에 작업을 먼저 클리어하고, 작업이 완료되면 그 결과를 다시 스케줄러에 보고합니다.

 

Executor의 종류

  1. SequentialExecutor (큐가 필요없이 단독동작)
       - 특징: 작업을 순차적으로 실행하는 가장 간단한 실행자입니다. 한 번에 하나의 작업만 실행됩니다.
       - 사용 사례: 테스트 및 개발 환경에서 작은 규모의 작업을 처리하는 데 적합합니다.
    + 작업들을 순서대로 실행하므로 큐를 사용하지 않아도 됨

  2. LocalExecutor (큐가 필요없이 단독동작) )
       - 특징:동일한 머신에서 여러 작업을 병렬로 처리하는 실행자입니다. LocalExecutor는 멀티프로세스를 사용하여 작업을 실행합니다.
       - 사용 사례:작은 규모의 작업을 병렬로 처리하고자 할 때 사용됩니다.
    + 작업들이 동일한 머신에서 실행되기 때문에 큐를 통한 통신 없이 바로 작업을 실행

  3. CeleryExecutor:
       - 특징:Celery라는 분산 작업 큐를 사용하여 작업을 여러 워커에서 병렬로 실행하는 실행자입니다.
       - 사용 사례: 대규모의 분산 시스템에서 Airflow를 사용하고자 할 때 사용됩니다.

  4. KubernetesExecutor:
       - 특징: Kubernetes 클러스터에서 각 작업을 컨테이너로 실행하는 실행자입니다. 각 작업은 별도의 파드로 실행
       - 사용 사례:Kubernetes 환경에서 Airflow를 사용하고자 할 때 사용됩니다.

  5. CeleryKubernetesExecutor:
       - 특징:Celery와 Kubernetes를 함께 사용하여 작업을 분산 환경에서 실행하는 실행자입니다.
       - 사용 사례:대규모의 분산 시스템에서 Airflow를 사용하고자 할 때, 특히 Celery와 Kubernetes를 함께 사용

  6. DaskExecutor:
       - 특징:Dask를 사용하여 작업을 분산 처리하는 실행자입니다. Dask는 분산 컴퓨팅을 위한 라이브러리입니다.
       - 사용 사례: 대규모 작업을 분산 환경에서 효율적으로 처리하고자 할 때 사용됩니다.

airflow 개발의 장단점

  • 장점 : 데이터 파이프라인을 세밀하게 제어, 다양한 데이터 소스와 데이터웨어하우스 지원, backfill이 쉬움

  • 단점 : 배우기가 쉽지 않음, 개발환경 구성 어려움, 직접운영이 어렵고 서버가 늘어나면 클라우드 버전으로 전환 필요
    +airflow 클라우드 버전 : GCP의 cloud composer , AWS의 managed workflows for apache airflow , AZURE의 data factory managed airflow

DAG란 무엇일까?

 

  • airflow에서 ETL를 부루는 명칭
  • DAG(Directed Acyclic Graph) : 작업(Task)의 흐름을 정의
  • ETL dag는 3개의 task로 구성됨(extract, transform,load)
    +task : DAG 내에서 실제로 수행되어야 하는 작은 작업 단위. airflow의 오퍼레이터로 만들어지고, 오퍼레이터(클래스,함수)를 만들거나 직접 개발할 수 있다.

    예) task1 = DummyOperator(task_id='task1', dag=dag)
    task2 = DummyOperator(task_id='task2', dag=dag)
    task1 >> task2

airflow 코드의 기본구조

    1. DAG를 만듦 (이름, 실행주기, 실행날짜, 오너)
      예)
      default_args = {
        'owner': 'gwanghyeon',
        'email': ['gwanghyeon @hotmail.com'],
        'retries': 1, 'retry_delay': timedelta(minutes=3),
      }
      dag = DAG(
        "dag_v1", # DAG name
        start_date=datetime(2020,8,7,hour=0,minute=00),
        schedule="0 * * * *", tags=["example"],
        catchup=False,
        # common settings
        default_args=default_args
        )

      + schedule 실행 주기 설정방법
      + catchup : start_date를 과거로 설정했을때, 지난 기간동안 실행을 모두 따라잡을거면 True, 안할꺼면 False

    2. DAG를 구성하는 task를 만듦(task 별로 적합한 오퍼레이터 선택)
      예) 위 구조로 실행할때 코드 작성의 예

      from airflow import DAG
      from airflow.operators.bash import BashOperator
      from datetime import datetime, timedelta
      default_args = {
        'owner': 'gwanghyeon',
        'start_date': datetime(2023, 5, 27, hour=0, minute=00),
        'email': ['keeyonghan@hotmail.com'],
        # 재시도 횟수
        'retries': 1,
        # 재시도 시간 간격
        'retry_delay': timedelta(minutes=3),
      }
      test_dag = DAG(
        "dag_v1", # DAG name schedule="0 9 * * *",
        tags=['test'],
        catchUp=False,
        default_args=default_args
      )

    3. task 제작(오퍼레이터 선택, 설정)과 순서 정의
      # t1의 현재시간 출력 task
      t1 = BashOperator(
         task_id='print_date',
         bash_command='date',
         dag=test_dag)

      # t2는 5초간 대기 후 종료
      t2 = BashOperator(
         task_id='sleep',
         bash_command='sleep 5',
         dag=test_dag)

      # t3는 서버의 /tmp 디렉토리의 내용 출력
      t3 = BashOperator(
         task_id='ls',
         bash_command='ls /tmp',
         dag=test_dag)

      # task 실행순서 지정 ( t1이 끝나고 t2와 t3를 병렬로 실행)
      t1 >> [ t2, t3 ]

    4. 웹 ui에서 dag 실행해보고 확인하기
      1) http://localhost:8080/ 접속
      2) id/pw 둘다 airflow로 초기설정 되어있음
      3) 홈화면에서 해당 dag 찾아 접속해 해당 dag 확인, task 별 실행 상태(녹색네모) 클릭 후 log 확인
      4) 코딩에서 구현했던 기능들이 구현됐는지 log에서 확인
      t1의 현재시간 출력 task


      t3는 서버의 /tmp 디렉토리의 내용 출력

 

  1. 커맨드 라인(우분투)에서 dag 실행해보고 확인하기
    # 현재 스케줄러 컨테이너 id 확인 
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker ps
    CONTAINER ID   IMAGE                  COMMAND                  CREATED      STATUS                   PORTS                                       NAMES
    0b27bb5ccb0e   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   2 days ago   Up 3 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-triggerer_1
    0da253c77542   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   2 days ago   Up 3 minutes (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   airflow-setup_airflow-webserver_1
    d306bc450764   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   2 days ago   Up 3 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-scheduler_1
    c9cb62c2053a   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   2 days ago   Up 3 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-worker_1
    48ee152ae746   redis:latest           "docker-entrypoint.s…"   2 days ago   Up 3 minutes (healthy)   6379/tcp                                    airflow-setup_redis_1
    a5e756836619   postgres:13            "docker-entrypoint.s…"   2 days ago   Up 3 minutes (healthy)   5432/tcp                                    airflow-setup_postgres_1

    # 스케줄러 shell 스크립트로 진입
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker exec -it d306bc450764 sh

    # dag 리스트에서 원하는 dag이름(dag_v1 )확인
    (airflow)airflow dags list

    dag_id                                   | filepath                                                                                                         | owner   | paused
    =========================================+==================================================================================================================+=========+=======
    HelloWorld                               | HelloWorld.py                                                                                                    | airflow | True
    HelloWorld_v2                            | HelloWorld_v2.py                                                                                                 | airflow | True
    dag_v1                                   | TestDAG.py

    # dag_v1의 task가 뭐있는지 확인(ls, print_date, sleep)
    (airflow)airflow tasks list dag_v1
    /home/airflow/.local/lib/python3.7/site-packages/airflow/models/base.py:49 MovedIn20Warning: [31mDeprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. [32mTo prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". [36mSet environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message.[0m (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
    ls
    print_date
    sleep

    # dag_v1 DAG에서 print_date 태스크를 실행하고, 2023-05-23로 설정하여 해당 날짜에 대한 실행을 시뮬레이션해봄
    (airflow)airflow tasks test dag_v1 print_date 2023-05-23

    [2024-01-01 03:41:56,471] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'date']
    [2024-01-01 03:41:56,484] {subprocess.py:86} INFO - Output:
    [2024-01-01 03:41:56,489] {subprocess.py:93} INFO - Mon Jan  1 03:41:56 UTC 2024

    -> print_date 결과가 2024년 1월 1일이 출력됐다. 그 이유는 catchUp=False 였기 때문에 중간의 실행은 생략되고,
    현재 실행날짜 기준으로 출력됐다.

+ dag에서 task 는 얼마나 분리하는게 좋을까?

  • task를 많이 만들면 전체 dag 실행이 오래걸리고 스케줄러에 부하가 감
  • 너무 적으면 모듈화가 안되고 실패시 재실행에 오래걸림
  • 오래 걸리는 dag 실패시 재실행이 쉽게, 다수의 task로 나누는 것이 좋음