1. PythonOperator 활용한 방법

  • PythonOperator : 이 Operator를 사용하면 Python 함수나 Python callable을 실행할 수 있다
  • 간단한 구조
    from airflow.operators.python
    import PythonOperator

    load_nps = PythonOperator(
       dag=dag,
       task_id='task_id',
       python_callable=python_func,

       # params로 인자들을 넘겨줌
       params={ 'table': 'delighted_nps', 'schema': 'raw_data' },
    )
    # cxt 통해 파라미터들을 받음
    def python_func(**cxt):
       table = cxt["params"]["table"]
       schema = cxt["params"]["schema"]
       ex_date = cxt["execution_date"]
    # Assign the tasks to the DAG in order
    print_hello() >> print_goodbye()

  • 실제 예
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime

    dag = DAG(
        dag_id = 'HelloWorld',
        start_date = datetime(2022,5,5),
        catchup=False,
        tags=['example'],
        schedule = '0 2 * * *')

    def print_hello():
        print("hello!")
        return "hello!"

    def print_goodbye():
        print("goodbye!")
        return "goodbye!"

    # PythonOperator는 그때 그때 선언해줘야한다
    print_hello = PythonOperator(
        #task 이름
        task_id = 'print_hello',
        #실행시키려는 함수이름
        python_callable = print_hello,
        #이 Task가 속한 DAG를 지정
        dag = dag)

    print_goodbye = PythonOperator(
        task_id = 'print_goodbye',
        python_callable = print_goodbye,
        dag = dag)

    # DAG 실행순서 할당
    # 순서를 정하지 않으면 각각 독립적으로 동시에 실행됨
    print_hello >> print_goodbye

2. Airflow Decorator 활용한 방법

  • Airflow Decorator : Airflow에서 사용되는 DAG를 정의하는 데코레이터(Decorator). 특정 기능이나 설정을 DAG 또는 DAG의 태스크에 적용하기 위한 방법으로 사용된다. Airflow Decorator는 자동으로 선언되기 때문에 PythonOperator에 비해 코드가 더 간단해진다.

  • 실제 예
    from airflow import DAG
    from airflow.decorators import task
    from datetime import datetime

    @task
    def print_hello():
        print("hello!")
        return "hello!"

    @task
    def print_goodbye():
        print("goodbye!")
        return "goodbye!"

    with DAG(
        dag_id = 'HelloWorld_v2',
        start_date = datetime(2022,5,5),
        catchup=False,
        tags=['example'],
        schedule = '0 2 * * *'
    ) as dag:

    # 함수이름이 기본으로 task id로 할당된다.
    print_hello() >> print_goodbye()

 

+ max_active_runs : 한번에 동시에 실행될수 있는 dag의 수. 일반적으로는 하나씩 해도 되지만, backfill을 할때는 여러개의 dag를 실행하며, 실행 시간을 줄일수 있다. 
max_active_tasks : 한번에 동시에  실행될수 있는 dag의 task 수. 리소스 사용을 조절하거나 부하를 관리하는 데 도움이됨
+airflow worker에 할당된 cpu의 총합= max_active의 한계

'airflow(에어플로우)' 카테고리의 다른 글

airflow dag(task decorator 활용) 실습 3  (0) 2024.01.02
airflow dag(xcom 활용) 실습 2  (0) 2024.01.02
airflow dag(params 활용) 실습 1  (0) 2024.01.02
airflow 기초  (0) 2023.12.31
데이터 파이프라인  (1) 2023.12.30

+ Recent posts