task decorator 활용한 방법

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = " 본인 ID ,M" -> [ ' 본인 ID ', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    """
    records = [
      [ " 본인 ID ", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),  # 날짜가 미래인 경우 실행이 안됨
    schedule='0 2 * * *',  # 적당히 조절
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:

    url = Variable.get("csv_url")
    schema = ' 본인 ID '   ## 자신의 스키마로 변경
    table = 'name_gender'

    lines = transform(extract(url))
    load(schema, table, lines)

 

커맨드 라인으로 실습파일 받고 실행

  1. 우선 airflow-setup 폴더에서 현재 폴더 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl
    total 32
    drwxr-xr-x 5 weare root   4096 Dec 29 21:53 logs
    drwxr-xr-x 3 weare root   4096 Dec 29 21:53 dags
    drwxr-xr-x 2 weare root   4096 Dec 29 21:49 plugins
    -rw-r--r-- 1 weare weare 10493 Dec 29 21:41 docker-compose.yaml
    -rw-r--r-- 1 weare weare    15 Dec 29 21:40 README.md
    drwxr-xr-x 3 weare weare  4096 Dec 29 21:40 docs

  2. 현재 dags 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
    total 16
    drwxrwxr-x 2 weare root 4096 Dec 29 21:53 __pycache__
    -rw-r--r-- 1 weare root  797 Dec 29 21:40 HelloWorld.py
    -rw-r--r-- 1 weare root  465 Dec 29 21:40 HelloWorld_v2.py
    -rw-r--r-- 1 weare root  717 Dec 29 21:40 TestDAG.py

  3. 깃에서 새로 실습할 dag들 다운 받음
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ git clone https://github.com/learndataeng/learn-airflow
    Cloning into 'learn-airflow'...
    remote: Enumerating objects: 219, done.
    remote: Counting objects: 100% (95/95), done.
    remote: Compressing objects: 100% (93/93), done.
    remote: Total 219 (delta 31), reused 2 (delta 0), pack-reused 124
    Receiving objects: 100% (219/219), 71.07 KiB | 8.88 MiB/s, done.
    Resolving deltas: 100% (106/106), done.

  4. 새로 받은 폴더의 내용 확인 (dags 폴더 확인)
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow
    total 12
    -rw-r--r-- 1 weare weare   15 Jan  1 23:27 README.md
    drwxr-xr-x 6 weare weare 4096 Jan  1 23:27 dags
    drwxr-xr-x 2 weare weare 4096 Jan  1 23:27 data

  5. dags 폴더 내부 dag 리스트 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow/dags
    total 136
    -rw-r--r-- 1 weare weare  2311 Jan  1 23:27 Backup_Airflow_Data_to_S3.py
    -rw-r--r-- 1 weare weare  1968 Jan  1 23:27 Build_Summary.py
    -rw-r--r-- 1 weare weare   774 Jan  1 23:27 Build_Summary_v2.py
    -rw-r--r-- 1 weare weare  1419 Jan  1 23:27 Cleanup_Log.py
    -rw-r--r-- 1 weare weare  4694 Jan  1 23:27 Gsheet_to_Redshift.py
    -rw-r--r-- 1 weare weare   803 Jan  1 23:27 HelloWorld.py
    -rw-r--r-- 1 weare weare   465 Jan  1 23:27 HelloWorld_v2.py
    -rw-r--r-- 1 weare weare   882 Jan  1 23:27 Learn_BranchPythonOperator.py
    -rw-r--r-- 1 weare weare   820 Jan  1 23:27 Learn_Jinja.py
    -rw-r--r-- 1 weare weare   632 Jan  1 23:27 Learn_LatestOnlyOperator.py
    -rw-r--r-- 1 weare weare  1213 Jan  1 23:27 Learn_TaskGroups.py
    -rw-r--r-- 1 weare weare   652 Jan  1 23:27 Learn_TriggerRule.py
    -rw-r--r-- 1 weare weare  1500 Jan  1 23:27 MySQL_to_Redshift.py
    -rw-r--r-- 1 weare weare  1643 Jan  1 23:27 MySQL_to_Redshift_v2.py
    -rw-r--r-- 1 weare weare  2402 Jan  1 23:27 NameGenderCSVtoRedshift.py
    -rw-r--r-- 1 weare weare  3087 Jan  1 23:27 NameGenderCSVtoRedshift_v2.py
    -rw-r--r-- 1 weare weare  3190 Jan  1 23:27 NameGenderCSVtoRedshift_v3.py
    -rw-r--r-- 1 weare weare  3174 Jan  1 23:27 NameGenderCSVtoRedshift_v4.py
    -rw-r--r-- 1 weare weare  2347 Jan  1 23:27 NameGenderCSVtoRedshift_v5.py
    -rw-r--r-- 1 weare weare   851 Jan  1 23:27 SQL_to_Sheet.py
    -rw-r--r-- 1 weare weare  1827 Jan  1 23:27 UpdateSymbol.py
    -rw-r--r-- 1 weare weare  2349 Jan  1 23:27 UpdateSymbol_v2.py
    -rw-r--r-- 1 weare weare  2579 Jan  1 23:27 Weather_to_Redshift.py
    -rw-r--r-- 1 weare weare  3352 Jan  1 23:27 Weather_to_Redshift_v2.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 config
    -rw-r--r-- 1 weare weare 10651 Jan  1 23:27 docker-compose.test.yaml
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 dynamic_dags
    -rw-r--r-- 1 weare weare   420 Jan  1 23:27 get_price_APPL.py
    -rw-r--r-- 1 weare weare   421 Jan  1 23:27 get_price_GOOG.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 plugins
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 trigger_dags

  6. 새로 받은 dags들(learn-airflow/dags) 모두 원래  dag위치( dags/ ) 로 이동
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ cp -r learn-airflow/dags/* dags/

  7. 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
    total 144
    drwxrwxr-x 2 weare root   4096 Jan  1 23:33 __pycache__
    -rw-r--r-- 1 weare weare  2311 Jan  1 23:33 Backup_Airflow_Data_to_S3.py
    -rw-r--r-- 1 weare weare  1968 Jan  1 23:33 Build_Summary.py
    -rw-r--r-- 1 weare weare   774 Jan  1 23:33 Build_Summary_v2.py
    -rw-r--r-- 1 weare weare  1419 Jan  1 23:33 Cleanup_Log.py
    -rw-r--r-- 1 weare weare  4694 Jan  1 23:33 Gsheet_to_Redshift.py
    -rw-r--r-- 1 weare root    803 Jan  1 23:33 HelloWorld.py
    -rw-r--r-- 1 weare root    465 Jan  1 23:33 HelloWorld_v2.py
    -rw-r--r-- 1 weare weare   882 Jan  1 23:33 Learn_BranchPythonOperator.py
    -rw-r--r-- 1 weare weare   820 Jan  1 23:33 Learn_Jinja.py
    -rw-r--r-- 1 weare weare   632 Jan  1 23:33 Learn_LatestOnlyOperator.py
    -rw-r--r-- 1 weare weare  1213 Jan  1 23:33 Learn_TaskGroups.py
    -rw-r--r-- 1 weare weare   652 Jan  1 23:33 Learn_TriggerRule.py
    -rw-r--r-- 1 weare weare  1500 Jan  1 23:33 MySQL_to_Redshift.py
    -rw-r--r-- 1 weare weare  1643 Jan  1 23:33 MySQL_to_Redshift_v2.py
    -rw-r--r-- 1 weare weare  2402 Jan  1 23:33 NameGenderCSVtoRedshift.py
    -rw-r--r-- 1 weare weare  3087 Jan  1 23:33 NameGenderCSVtoRedshift_v2.py
    -rw-r--r-- 1 weare weare  3190 Jan  1 23:33 NameGenderCSVtoRedshift_v3.py
    -rw-r--r-- 1 weare weare  3174 Jan  1 23:33 NameGenderCSVtoRedshift_v4.py
    -rw-r--r-- 1 weare weare  2347 Jan  1 23:33 NameGenderCSVtoRedshift_v5.py
    -rw-r--r-- 1 weare weare   851 Jan  1 23:33 SQL_to_Sheet.py
    -rw-r--r-- 1 weare weare  1827 Jan  1 23:33 UpdateSymbol.py
    -rw-r--r-- 1 weare weare  2349 Jan  1 23:33 UpdateSymbol_v2.py
    -rw-r--r-- 1 weare weare  2579 Jan  1 23:33 Weather_to_Redshift.py
    -rw-r--r-- 1 weare weare  3352 Jan  1 23:33 Weather_to_Redshift_v2.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 config
    -rw-r--r-- 1 weare weare 10651 Jan  1 23:33 docker-compose.test.yaml
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 dynamic_dags
    -rw-r--r-- 1 weare weare   420 Jan  1 23:33 get_price_APPL.py
    -rw-r--r-- 1 weare weare   421 Jan  1 23:33 get_price_GOOG.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 plugins
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 trigger_dags
    -rw-r--r-- 1 weare root    717 Dec 29 21:40 TestDAG.py

  8. vim 편집기로 NameGenderCSVtoRedshift_v5 접속 후 개인 id로 수정
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim NameGenderCSVtoRedshift_v5.py
  9. 웹 ui(http://localhost:8080/)에 접속해보면 task 3개 모두 정상 실행된것을 확인할수 있다.

+ Recent posts