변수 외부화와 , xcom을 활용한 수정

  1. 수정사항 :  웹 ui에서 connection에 redshift 링크 변수 추가,  필요할때 'redshift_dev_db'로 활용.
    variable에 s3링크 추가, 필요할때 'csv_url' 로 가져와서 활용.

    save 후 test 눌러 연결된것을 확인할것!


  2. xcom : task들 간에 데이터를 주고받기 위한 방식, 한 operator의 리턴 값을 db에 저장하고 다른 operator에 넘기는 방법으로 순차 실행. 메타 데이터 db에 저장 되기 너무 큰 데이터는 불가능

    XCom의 실행순서는 아래와 같다.

    1) **Task에서 데이터 생성**: Task에서 생성한 데이터는 XCom을 통해 저장됩니다. 일반적으로 Python 함수에서 `return` 문을 사용하여 데이터를 반환하는 경우 해당 데이터가 XCom에 저장됩니다.

        def some_task(**context):
            result_data = some_function()
            return result_data

    2) **XCom에 데이터 저장**: Task에서 반환한 데이터는 Airflow 내부의 XCom 시스템을 통해 DAG 실행 데이터베이스에 저장됩니다. 다만 `context["task_instance"].xcom_push()` 사용할 필요가 없다. PythonOperator를 통해 context["task_instance"] 에 key가 "return_value"이고, task_ids가 "some_task"인 값에, 자동으로 저장된다.
    + task_instance와 return_value는 어디서 선언한게 아닌 기본 이름값이다. some_task은 함수 이름.

    3) **XCom에서 데이터 추출**: 다른 Task에서는 저장된 데이터를 추출하여 사용할 수 있습니다. 이는 `context["task_instance"].xcom_pull()` 메서드를 사용하여 수행됩니다.

        def another_task(**context):
            result_data = context["task_instance"].xcom_pull(key="return_value" , task_ids= "some_task")
#위 두개를 반영한 코드
from airflow import DAG
from airflow.operators.python import PythonOperator

#따로 저장한 Variable을 불러옴
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2

def get_Redshift_connection(autocommit=True):
    # 웹 ui에서 conn에 저장한 값들을 불러온다.
    # PostgresHook을 사용하여 Redshift에 연결
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    # 오토커밋 설정
    conn.autocommit = autocommit
    # 커서 반환
    return conn.cursor()

def extract(**context):
    # Task에서 필요한 매개변수들을 context로부터 추출
    link = context["params"]["url"]  # DAG에서 정의한 url 매개변수를 가져옴

    # 1. Task에서 xcom에 데이터 생성
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    # 로깅: 실행 날짜 및 링크 정보 출력
    logging.info(execution_date)
    
    # 링크로부터 데이터 추출 (예제에서는 requests 모듈 사용)
    f = requests.get(link)
    
    # 추출한 데이터 반환, 반환과 동시에 2. XCom에 데이터( task_instance ) 저장 
    # PythonOperator의 기본 동작으로 push를 따로 수행할 필요가 없음
    return (f.text)

def transform(**context):
    # 로깅: 변환 시작 메시지 출력
    logging.info("Transform started")
    
    #  3. XCom에 데이터 추출
    # xcom_pull을 사용하여 'extract' Task에서 반환한 데이터를 가져옴
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    
    # 추출한 데이터를 처리하여 새로운 형태의 데이터로 변환
    lines = text.strip().split("\n")[1:]  # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
        (name, gender) = l.split(",")  # 예제에서는 쉼표로 구분된 데이터를 처리
        records.append([name, gender])

    # 로깅: 변환 종료 메시지 출력
    logging.info("Transform ended")
    
    # 변환된 데이터 반환
    return records

def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]

    records = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    """
    records = [
      [ " 본인 ID ", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise
    logging.info("load done")


dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

#  2. XCom에 데이터 저장 :  PythonOperator는 기본적으로 Task 함수가 반환하는 값을 XCom에 저장합니다.
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': ' 본인 ID ',
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load

 

커맨드 라인으로 실습파일 받고 실행

  1. 우선 airflow-setup 폴더에서 현재 폴더 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl
    total 32
    drwxr-xr-x 5 weare root   4096 Dec 29 21:53 logs
    drwxr-xr-x 3 weare root   4096 Dec 29 21:53 dags
    drwxr-xr-x 2 weare root   4096 Dec 29 21:49 plugins
    -rw-r--r-- 1 weare weare 10493 Dec 29 21:41 docker-compose.yaml
    -rw-r--r-- 1 weare weare    15 Dec 29 21:40 README.md
    drwxr-xr-x 3 weare weare  4096 Dec 29 21:40 docs

  2. 현재 dags 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
    total 16
    drwxrwxr-x 2 weare root 4096 Dec 29 21:53 __pycache__
    -rw-r--r-- 1 weare root  797 Dec 29 21:40 HelloWorld.py
    -rw-r--r-- 1 weare root  465 Dec 29 21:40 HelloWorld_v2.py
    -rw-r--r-- 1 weare root  717 Dec 29 21:40 TestDAG.py

  3. 깃에서 새로 실습할 dag들 다운 받음
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ git clone https://github.com/learndataeng/learn-airflow
    Cloning into 'learn-airflow'...
    remote: Enumerating objects: 219, done.
    remote: Counting objects: 100% (95/95), done.
    remote: Compressing objects: 100% (93/93), done.
    remote: Total 219 (delta 31), reused 2 (delta 0), pack-reused 124
    Receiving objects: 100% (219/219), 71.07 KiB | 8.88 MiB/s, done.
    Resolving deltas: 100% (106/106), done.

  4. 새로 받은 폴더의 내용 확인 (dags 폴더 확인)
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow
    total 12
    -rw-r--r-- 1 weare weare   15 Jan  1 23:27 README.md
    drwxr-xr-x 6 weare weare 4096 Jan  1 23:27 dags
    drwxr-xr-x 2 weare weare 4096 Jan  1 23:27 data

  5. dags 폴더 내부 dag 리스트 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow/dags
    total 136
    -rw-r--r-- 1 weare weare  2311 Jan  1 23:27 Backup_Airflow_Data_to_S3.py
    -rw-r--r-- 1 weare weare  1968 Jan  1 23:27 Build_Summary.py
    -rw-r--r-- 1 weare weare   774 Jan  1 23:27 Build_Summary_v2.py
    -rw-r--r-- 1 weare weare  1419 Jan  1 23:27 Cleanup_Log.py
    -rw-r--r-- 1 weare weare  4694 Jan  1 23:27 Gsheet_to_Redshift.py
    -rw-r--r-- 1 weare weare   803 Jan  1 23:27 HelloWorld.py
    -rw-r--r-- 1 weare weare   465 Jan  1 23:27 HelloWorld_v2.py
    -rw-r--r-- 1 weare weare   882 Jan  1 23:27 Learn_BranchPythonOperator.py
    -rw-r--r-- 1 weare weare   820 Jan  1 23:27 Learn_Jinja.py
    -rw-r--r-- 1 weare weare   632 Jan  1 23:27 Learn_LatestOnlyOperator.py
    -rw-r--r-- 1 weare weare  1213 Jan  1 23:27 Learn_TaskGroups.py
    -rw-r--r-- 1 weare weare   652 Jan  1 23:27 Learn_TriggerRule.py
    -rw-r--r-- 1 weare weare  1500 Jan  1 23:27 MySQL_to_Redshift.py
    -rw-r--r-- 1 weare weare  1643 Jan  1 23:27 MySQL_to_Redshift_v2.py
    -rw-r--r-- 1 weare weare  2402 Jan  1 23:27 NameGenderCSVtoRedshift.py
    -rw-r--r-- 1 weare weare  3087 Jan  1 23:27 NameGenderCSVtoRedshift_v2.py
    -rw-r--r-- 1 weare weare  3190 Jan  1 23:27 NameGenderCSVtoRedshift_v3.py
    -rw-r--r-- 1 weare weare  3174 Jan  1 23:27 NameGenderCSVtoRedshift_v4.py
    -rw-r--r-- 1 weare weare  2347 Jan  1 23:27 NameGenderCSVtoRedshift_v5.py
    -rw-r--r-- 1 weare weare   851 Jan  1 23:27 SQL_to_Sheet.py
    -rw-r--r-- 1 weare weare  1827 Jan  1 23:27 UpdateSymbol.py
    -rw-r--r-- 1 weare weare  2349 Jan  1 23:27 UpdateSymbol_v2.py
    -rw-r--r-- 1 weare weare  2579 Jan  1 23:27 Weather_to_Redshift.py
    -rw-r--r-- 1 weare weare  3352 Jan  1 23:27 Weather_to_Redshift_v2.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 config
    -rw-r--r-- 1 weare weare 10651 Jan  1 23:27 docker-compose.test.yaml
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 dynamic_dags
    -rw-r--r-- 1 weare weare   420 Jan  1 23:27 get_price_APPL.py
    -rw-r--r-- 1 weare weare   421 Jan  1 23:27 get_price_GOOG.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 plugins
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 trigger_dags

  6. 새로 받은 dags들(learn-airflow/dags) 모두 원래  dag위치( dags/ ) 로 이동
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ cp -r learn-airflow/dags/* dags/

  7. 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
    total 144
    drwxrwxr-x 2 weare root   4096 Jan  1 23:33 __pycache__
    -rw-r--r-- 1 weare weare  2311 Jan  1 23:33 Backup_Airflow_Data_to_S3.py
    -rw-r--r-- 1 weare weare  1968 Jan  1 23:33 Build_Summary.py
    -rw-r--r-- 1 weare weare   774 Jan  1 23:33 Build_Summary_v2.py
    -rw-r--r-- 1 weare weare  1419 Jan  1 23:33 Cleanup_Log.py
    -rw-r--r-- 1 weare weare  4694 Jan  1 23:33 Gsheet_to_Redshift.py
    -rw-r--r-- 1 weare root    803 Jan  1 23:33 HelloWorld.py
    -rw-r--r-- 1 weare root    465 Jan  1 23:33 HelloWorld_v2.py
    -rw-r--r-- 1 weare weare   882 Jan  1 23:33 Learn_BranchPythonOperator.py
    -rw-r--r-- 1 weare weare   820 Jan  1 23:33 Learn_Jinja.py
    -rw-r--r-- 1 weare weare   632 Jan  1 23:33 Learn_LatestOnlyOperator.py
    -rw-r--r-- 1 weare weare  1213 Jan  1 23:33 Learn_TaskGroups.py
    -rw-r--r-- 1 weare weare   652 Jan  1 23:33 Learn_TriggerRule.py
    -rw-r--r-- 1 weare weare  1500 Jan  1 23:33 MySQL_to_Redshift.py
    -rw-r--r-- 1 weare weare  1643 Jan  1 23:33 MySQL_to_Redshift_v2.py
    -rw-r--r-- 1 weare weare  2402 Jan  1 23:33 NameGenderCSVtoRedshift.py
    -rw-r--r-- 1 weare weare  3087 Jan  1 23:33 NameGenderCSVtoRedshift_v2.py
    -rw-r--r-- 1 weare weare  3190 Jan  1 23:33 NameGenderCSVtoRedshift_v3.py
    -rw-r--r-- 1 weare weare  3174 Jan  1 23:33 NameGenderCSVtoRedshift_v4.py
    -rw-r--r-- 1 weare weare  2347 Jan  1 23:33 NameGenderCSVtoRedshift_v5.py
    -rw-r--r-- 1 weare weare   851 Jan  1 23:33 SQL_to_Sheet.py
    -rw-r--r-- 1 weare weare  1827 Jan  1 23:33 UpdateSymbol.py
    -rw-r--r-- 1 weare weare  2349 Jan  1 23:33 UpdateSymbol_v2.py
    -rw-r--r-- 1 weare weare  2579 Jan  1 23:33 Weather_to_Redshift.py
    -rw-r--r-- 1 weare weare  3352 Jan  1 23:33 Weather_to_Redshift_v2.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 config
    -rw-r--r-- 1 weare weare 10651 Jan  1 23:33 docker-compose.test.yaml
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 dynamic_dags
    -rw-r--r-- 1 weare weare   420 Jan  1 23:33 get_price_APPL.py
    -rw-r--r-- 1 weare weare   421 Jan  1 23:33 get_price_GOOG.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 plugins
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 trigger_dags
    -rw-r--r-- 1 weare root    717 Dec 29 21:40 TestDAG.py

  8. vim 편집기로 NameGenderCSVtoRedshift_v4 접속 후 개인 id로 수정
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim NameGenderCSVtoRedshift_v5.py
  9. 웹 ui(http://localhost:8080/)에 접속해보면 task 3개 모두 정상 실행된것을 확인할수 있다.

+ Recent posts