1. PythonOperator 활용한 방법
- PythonOperator : 이 Operator를 사용하면 Python 함수나 Python callable을 실행할 수 있다
- 간단한 구조
from airflow.operators.python
import PythonOperator
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
# params로 인자들을 넘겨줌
params={ 'table': 'delighted_nps', 'schema': 'raw_data' },
)
# cxt 통해 파라미터들을 받음
def python_func(**cxt):
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye() - 실제 예
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
# PythonOperator는 그때 그때 선언해줘야한다
print_hello = PythonOperator(
#task 이름
task_id = 'print_hello',
#실행시키려는 함수이름
python_callable = print_hello,
#이 Task가 속한 DAG를 지정
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
# DAG 실행순서 할당
# 순서를 정하지 않으면 각각 독립적으로 동시에 실행됨
print_hello >> print_goodbye
2. Airflow Decorator 활용한 방법
- Airflow Decorator : Airflow에서 사용되는 DAG를 정의하는 데코레이터(Decorator). 특정 기능이나 설정을 DAG 또는 DAG의 태스크에 적용하기 위한 방법으로 사용된다. Airflow Decorator는 자동으로 선언되기 때문에 PythonOperator에 비해 코드가 더 간단해진다.
- 실제 예
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# 함수이름이 기본으로 task id로 할당된다.
print_hello() >> print_goodbye()
+ max_active_runs : 한번에 동시에 실행될수 있는 dag의 수. 일반적으로는 하나씩 해도 되지만, backfill을 할때는 여러개의 dag를 실행하며, 실행 시간을 줄일수 있다.
max_active_tasks : 한번에 동시에 실행될수 있는 dag의 task 수. 리소스 사용을 조절하거나 부하를 관리하는 데 도움이됨
+airflow worker에 할당된 cpu의 총합= max_active의 한계
'airflow(에어플로우)' 카테고리의 다른 글
airflow dag(task decorator 활용) 실습 3 (0) | 2024.01.02 |
---|---|
airflow dag(xcom 활용) 실습 2 (0) | 2024.01.02 |
airflow dag(params 활용) 실습 1 (0) | 2024.01.02 |
airflow 기초 (0) | 2023.12.31 |
데이터 파이프라인 (1) | 2023.12.30 |