(핵심기능) backfill을 단순화 해줌 + backfill : 데이터 세트의 누락 또는 불완전한 데이터를 채우는 프로세스
airflow 구조
웹서버 (Web Server): - Airflow 웹서버는 Airflow UI를 제공하며, DAGs(Directed Acyclic Graphs)를 시각적으로 관리하고 모니터링하는 데 사용됩니다. 웹 서버는 사용자 인터페이스를 통해 DAG 실행, 로그 확인, 스케줄러 상태 확인 등을 제공합니다.
스케줄러 (Scheduler): - Airflow 스케줄러는 정의된 DAG 실행 일정에 따라 작업을 예약하고 관리합니다. 주어진 DAG의 실행을 트리거하고 작업 간의 종속성을 관리하여 정의된 일정에 따라 작업을 실행합니다.
워커 (Worker): - Airflow 워커는 스케줄러에 의해 예약된 작업을 실행하는 역할을 합니다. 여러 워커가 병렬로 작업을 처리하여 시스템의 확장성을 지원합니다.
메타데이터 데이터베이스 (Metadata Database): - Airflow는 메타데이터 데이터베이스를 사용하여 실행된 DAG, 작업 실행 로그, 작업 상태 등의 메타데이터를 저장합니다. 이 메타데이터는 스케줄링, 모니터링 및 이력 추적을 위해 사용됩니다. 기본으로 sqlite가 설치된다. 그러나 실제로는 mysql,postgres를 설치해 사용한다.
큐 (Queue): - 다수서버 구성인 경우에만 사용된다. Airflow는 메시지 큐를 사용하여 작업 간의 메시지 전달을 처리합니다. 큐를 통해 스케줄러는 워커에 작업을 할당하고, 워커는 큐를 통해 스케줄러에 작업 완료 상태를 보고합니다.
실무에서 airflow 사용시 좋은 사양이 필요해진다면
서버 1개 사용시 : 서버의 사양을 높여나간다.
2개 이상 사용시 : 사양 증가의 한계치가 오면 서버를 늘린다. 이때 airflow 제공해주는 클라우드로 가는 것이 경제적 + 서버를 늘린다 : 보통 스케줄러 웹서버는 두고, worker용으로 서버를 늘린다.
airflow를 다수 서버로 구성할때
스케줄러 (Scheduler): - Airflow 스케줄러는 정의된 DAG(Directed Acyclic Graph)의 실행을 예약하고 관리하는 주체입니다. 스케줄러는 DAG 실행의 스케줄링과 종속성 그래프를 고려하여 작업을 실행합니다.
실행자 (Executor): - 실행자는 스케줄러에 의해 예약된 작업을 실행하는 역할을 합니다. Airflow에서는 여러 가지 종류의 실행자가 제공되며, 이들은 작업을 실행하는 방식에 대한 다른 전략을 제공합니다. 예를 들어, LocalExecutor는 단일 머신에서 작업을 실행하고, CeleryExecutor는 분산된 환경에서 작업을 실행합니다. +일부Executor의 경우 동작 특성에 따라 큐가 필요없이 단독으로 동작이 가능합니다.
큐 (Queue): - Airflow에서는 메시지 큐를 사용하여 작업 간의 통신을 조정합니다. 작업이 실행되기 전에 스케줄러는 해당 작업을 큐에 넣고, 실행자는 큐에서 작업을 가져와 실행합니다. 큐는 스케줄러와 실행자 간의 효율적인 통신을 도와줍니다.
스케줄러와 실행자 간의 관계: - 스케줄러는 DAG 실행을 예약하고 해당 작업을 큐에 추가합니다. - 실행자는 큐에서 작업을 가져와 실행합니다. 실행자는 실행 전에 작업을 먼저 클리어하고, 작업이 완료되면 그 결과를 다시 스케줄러에 보고합니다.
Executor의 종류
SequentialExecutor (큐가 필요없이 단독동작) - 특징: 작업을 순차적으로 실행하는 가장 간단한 실행자입니다. 한 번에 하나의 작업만 실행됩니다. - 사용 사례: 테스트 및 개발 환경에서 작은 규모의 작업을 처리하는 데 적합합니다. + 작업들을 순서대로 실행하므로 큐를 사용하지 않아도 됨
LocalExecutor (큐가 필요없이 단독동작) ) - 특징:동일한 머신에서 여러 작업을 병렬로 처리하는 실행자입니다. LocalExecutor는 멀티프로세스를 사용하여 작업을 실행합니다. - 사용 사례:작은 규모의 작업을 병렬로 처리하고자 할 때 사용됩니다. +작업들이 동일한 머신에서 실행되기 때문에 큐를 통한 통신 없이 바로 작업을 실행
CeleryExecutor: - 특징:Celery라는 분산 작업 큐를 사용하여 작업을 여러 워커에서 병렬로 실행하는 실행자입니다. - 사용 사례: 대규모의 분산 시스템에서 Airflow를 사용하고자 할 때 사용됩니다.
KubernetesExecutor: - 특징: Kubernetes 클러스터에서 각 작업을 컨테이너로 실행하는 실행자입니다. 각 작업은 별도의 파드로 실행 - 사용 사례:Kubernetes 환경에서 Airflow를 사용하고자 할 때 사용됩니다.
CeleryKubernetesExecutor: - 특징:Celery와 Kubernetes를 함께 사용하여 작업을 분산 환경에서 실행하는 실행자입니다. - 사용 사례:대규모의 분산 시스템에서 Airflow를 사용하고자 할 때, 특히 Celery와 Kubernetes를 함께 사용
DaskExecutor: - 특징:Dask를 사용하여 작업을 분산 처리하는 실행자입니다. Dask는 분산 컴퓨팅을 위한 라이브러리입니다. - 사용 사례: 대규모 작업을 분산 환경에서 효율적으로 처리하고자 할 때 사용됩니다.
airflow 개발의 장단점
장점 : 데이터 파이프라인을 세밀하게 제어, 다양한 데이터 소스와 데이터웨어하우스 지원, backfill이 쉬움
단점 : 배우기가 쉽지 않음, 개발환경 구성 어려움, 직접운영이 어렵고 서버가 늘어나면 클라우드 버전으로 전환 필요 +airflow 클라우드 버전 : GCP의 cloud composer , AWS의 managed workflows for apache airflow , AZURE의 data factory managed airflow
DAG란 무엇일까?
airflow에서 ETL를 부루는 명칭
DAG(Directed Acyclic Graph) : 작업(Task)의 흐름을 정의
ETL dag는 3개의 task로 구성됨(extract, transform,load) +task : DAG 내에서 실제로 수행되어야 하는 작은 작업 단위. airflow의 오퍼레이터로 만들어지고, 오퍼레이터(클래스,함수)를 만들거나 직접 개발할 수 있다. 예) task1 = DummyOperator(task_id='task1', dag=dag) task2 = DummyOperator(task_id='task2', dag=dag) task1 >> task2
airflow 코드의 기본구조
DAG를 만듦 (이름, 실행주기, 실행날짜, 오너) 예) default_args = { 'owner': 'gwanghyeon', 'email': ['gwanghyeon @hotmail.com'], 'retries': 1, 'retry_delay': timedelta(minutes=3), } dag = DAG( "dag_v1", # DAG name start_date=datetime(2020,8,7,hour=0,minute=00), schedule="0 * * * *", tags=["example"], catchup=False, # common settings default_args=default_args )
+ schedule 실행 주기 설정방법
+ catchup : start_date를 과거로 설정했을때, 지난 기간동안 실행을 모두 따라잡을거면 True, 안할꺼면 False
DAG를 구성하는 task를 만듦(task 별로 적합한 오퍼레이터 선택)
예) 위 구조로 실행할때 코드 작성의 예
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'gwanghyeon', 'start_date': datetime(2023, 5, 27, hour=0, minute=00), 'email': ['keeyonghan@hotmail.com'], # 재시도 횟수 'retries': 1, # 재시도 시간 간격 'retry_delay': timedelta(minutes=3), } test_dag = DAG( "dag_v1", # DAG name schedule="0 9 * * *", tags=['test'], catchUp=False, default_args=default_args )
task 제작(오퍼레이터 선택, 설정)과 순서 정의 # t1의 현재시간 출력 task t1 = BashOperator( task_id='print_date', bash_command='date', dag=test_dag)
# t2는 5초간 대기 후 종료 t2 = BashOperator( task_id='sleep', bash_command='sleep 5', dag=test_dag)
# t3는 서버의 /tmp 디렉토리의 내용 출력 t3 = BashOperator( task_id='ls', bash_command='ls /tmp', dag=test_dag)
# task 실행순서 지정 ( t1이 끝나고 t2와 t3를 병렬로 실행) t1 >> [ t2, t3 ]
웹 ui에서 dag 실행해보고 확인하기 1) http://localhost:8080/ 접속 2) id/pw 둘다 airflow로 초기설정 되어있음 3) 홈화면에서 해당 dag 찾아 접속해 해당 dag 확인, task 별 실행 상태(녹색네모) 클릭 후 log 확인 4) 코딩에서 구현했던 기능들이 구현됐는지 log에서 확인 t1의 현재시간 출력 task
t3는 서버의 /tmp 디렉토리의 내용 출력
커맨드 라인(우분투)에서 dag 실행해보고 확인하기 # 현재 스케줄러 컨테이너 id 확인 weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 0b27bb5ccb0e apache/airflow:2.5.1 "/usr/bin/dumb-init …" 2 days ago Up 3 minutes (healthy) 8080/tcp airflow-setup_airflow-triggerer_1 0da253c77542 apache/airflow:2.5.1 "/usr/bin/dumb-init …" 2 days ago Up 3 minutes (healthy) 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp airflow-setup_airflow-webserver_1 d306bc450764 apache/airflow:2.5.1 "/usr/bin/dumb-init …" 2 days ago Up 3 minutes (healthy) 8080/tcp airflow-setup_airflow-scheduler_1 c9cb62c2053a apache/airflow:2.5.1 "/usr/bin/dumb-init …" 2 days ago Up 3 minutes (healthy) 8080/tcp airflow-setup_airflow-worker_1 48ee152ae746 redis:latest "docker-entrypoint.s…" 2 days ago Up 3 minutes (healthy) 6379/tcp airflow-setup_redis_1 a5e756836619 postgres:13 "docker-entrypoint.s…" 2 days ago Up 3 minutes (healthy) 5432/tcp airflow-setup_postgres_1
# dag_v1의 task가 뭐있는지 확인(ls, print_date, sleep) (airflow)airflow tasks list dag_v1 /home/airflow/.local/lib/python3.7/site-packages/airflow/models/base.py:49 MovedIn20Warning: [31mDeprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. [32mTo prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". [36mSet environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message.[0m (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9) ls print_date sleep
# dag_v1 DAG에서 print_date 태스크를 실행하고, 2023-05-23로 설정하여 해당 날짜에 대한 실행을 시뮬레이션해봄 (airflow)airflow tasks test dag_v1 print_date 2023-05-23
[2024-01-01 03:41:56,471] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'date'] [2024-01-01 03:41:56,484] {subprocess.py:86} INFO - Output: [2024-01-01 03:41:56,489] {subprocess.py:93} INFO - Mon Jan 1 03:41:56 UTC 2024
-> print_date 결과가 2024년 1월 1일이 출력됐다. 그 이유는 catchUp=False 였기 때문에 중간의 실행은 생략되고, 현재 실행날짜 기준으로 출력됐다.