변수 외부화와 , xcom을 활용한 수정
- 수정사항 : 웹 ui에서 connection에 redshift 링크 변수 추가, 필요할때 'redshift_dev_db'로 활용.
variable에 s3링크 추가, 필요할때 'csv_url' 로 가져와서 활용.save 후 test 눌러 연결된것을 확인할것! - xcom : task들 간에 데이터를 주고받기 위한 방식, 한 operator의 리턴 값을 db에 저장하고 다른 operator에 넘기는 방법으로 순차 실행. 메타 데이터 db에 저장 되기 너무 큰 데이터는 불가능
XCom의 실행순서는 아래와 같다.
1) **Task에서 데이터 생성**: Task에서 생성한 데이터는 XCom을 통해 저장됩니다. 일반적으로 Python 함수에서 `return` 문을 사용하여 데이터를 반환하는 경우 해당 데이터가 XCom에 저장됩니다.
def some_task(**context):
result_data = some_function()
return result_data
2) **XCom에 데이터 저장**: Task에서 반환한 데이터는 Airflow 내부의 XCom 시스템을 통해 DAG 실행 데이터베이스에 저장됩니다. 다만 `context["task_instance"].xcom_push()` 사용할 필요가 없다. PythonOperator를 통해 context["task_instance"] 에 key가 "return_value"이고, task_ids가 "some_task"인 값에, 자동으로 저장된다.
+ task_instance와 return_value는 어디서 선언한게 아닌 기본 이름값이다. some_task은 함수 이름.
3) **XCom에서 데이터 추출**: 다른 Task에서는 저장된 데이터를 추출하여 사용할 수 있습니다. 이는 `context["task_instance"].xcom_pull()` 메서드를 사용하여 수행됩니다.
def another_task(**context):
result_data = context["task_instance"].xcom_pull(key="return_value" , task_ids= "some_task")
#위 두개를 반영한 코드 from airflow import DAG from airflow.operators.python import PythonOperator #따로 저장한 Variable을 불러옴 from airflow.models import Variable from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 def get_Redshift_connection(autocommit=True): # 웹 ui에서 conn에 저장한 값들을 불러온다. # PostgresHook을 사용하여 Redshift에 연결 hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() # 오토커밋 설정 conn.autocommit = autocommit # 커서 반환 return conn.cursor() def extract(**context): # Task에서 필요한 매개변수들을 context로부터 추출 link = context["params"]["url"] # DAG에서 정의한 url 매개변수를 가져옴 # 1. Task에서 xcom에 데이터 생성 task_instance = context['task_instance'] execution_date = context['execution_date'] # 로깅: 실행 날짜 및 링크 정보 출력 logging.info(execution_date) # 링크로부터 데이터 추출 (예제에서는 requests 모듈 사용) f = requests.get(link) # 추출한 데이터 반환, 반환과 동시에 2. XCom에 데이터( task_instance ) 저장 # PythonOperator의 기본 동작으로 push를 따로 수행할 필요가 없음 return (f.text) def transform(**context): # 로깅: 변환 시작 메시지 출력 logging.info("Transform started") # 3. XCom에 데이터 추출 # xcom_pull을 사용하여 'extract' Task에서 반환한 데이터를 가져옴 text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract") # 추출한 데이터를 처리하여 새로운 형태의 데이터로 변환 lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리 records = [] for l in lines: (name, gender) = l.split(",") # 예제에서는 쉼표로 구분된 데이터를 처리 records.append([name, gender]) # 로깅: 변환 종료 메시지 출력 logging.info("Transform ended") # 변환된 데이터 반환 return records def load(**context): logging.info("load started") schema = context["params"]["schema"] table = context["params"]["table"] records = context["task_instance"].xcom_pull(key="return_value", task_ids="transform") """ records = [ [ " 본인 ID ", "M" ], [ "Claire", "F" ], ... ] """ # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음 cur = get_Redshift_connection() try: cur.execute("BEGIN;") cur.execute(f"DELETE FROM {schema}.name_gender;") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: name = r[0] gender = r[1] print(name, "-", gender) sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;") raise logging.info("load done") dag = DAG( dag_id = 'name_gender_v4', start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 2 * * *', # 적당히 조절 catchup = False, max_active_runs = 1, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) # 2. XCom에 데이터 저장 : PythonOperator는 기본적으로 Task 함수가 반환하는 값을 XCom에 저장합니다. extract = PythonOperator( task_id = 'extract', python_callable = extract, params = { 'url': Variable.get("csv_url") }, dag = dag) transform = PythonOperator( task_id = 'transform', python_callable = transform, params = { }, dag = dag) load = PythonOperator( task_id = 'load', python_callable = load, params = { 'schema': ' 본인 ID ', 'table': 'name_gender' }, dag = dag) extract >> transform >> load |
커맨드 라인으로 실습파일 받고 실행
- 우선 airflow-setup 폴더에서 현재 폴더 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl
total 32
drwxr-xr-x 5 weare root 4096 Dec 29 21:53 logs
drwxr-xr-x 3 weare root 4096 Dec 29 21:53 dags
drwxr-xr-x 2 weare root 4096 Dec 29 21:49 plugins
-rw-r--r-- 1 weare weare 10493 Dec 29 21:41 docker-compose.yaml
-rw-r--r-- 1 weare weare 15 Dec 29 21:40 README.md
drwxr-xr-x 3 weare weare 4096 Dec 29 21:40 docs - 현재 dags 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
total 16
drwxrwxr-x 2 weare root 4096 Dec 29 21:53 __pycache__
-rw-r--r-- 1 weare root 797 Dec 29 21:40 HelloWorld.py
-rw-r--r-- 1 weare root 465 Dec 29 21:40 HelloWorld_v2.py
-rw-r--r-- 1 weare root 717 Dec 29 21:40 TestDAG.py - 깃에서 새로 실습할 dag들 다운 받음
weare@DESKTOP-BE1I4GE:~/airflow-setup$ git clone https://github.com/learndataeng/learn-airflow
Cloning into 'learn-airflow'...
remote: Enumerating objects: 219, done.
remote: Counting objects: 100% (95/95), done.
remote: Compressing objects: 100% (93/93), done.
remote: Total 219 (delta 31), reused 2 (delta 0), pack-reused 124
Receiving objects: 100% (219/219), 71.07 KiB | 8.88 MiB/s, done.
Resolving deltas: 100% (106/106), done. - 새로 받은 폴더의 내용 확인 (dags 폴더 확인)
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow
total 12
-rw-r--r-- 1 weare weare 15 Jan 1 23:27 README.md
drwxr-xr-x 6 weare weare 4096 Jan 1 23:27 dags
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 data - dags 폴더 내부 dag 리스트 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow/dags
total 136
-rw-r--r-- 1 weare weare 2311 Jan 1 23:27 Backup_Airflow_Data_to_S3.py
-rw-r--r-- 1 weare weare 1968 Jan 1 23:27 Build_Summary.py
-rw-r--r-- 1 weare weare 774 Jan 1 23:27 Build_Summary_v2.py
-rw-r--r-- 1 weare weare 1419 Jan 1 23:27 Cleanup_Log.py
-rw-r--r-- 1 weare weare 4694 Jan 1 23:27 Gsheet_to_Redshift.py
-rw-r--r-- 1 weare weare 803 Jan 1 23:27 HelloWorld.py
-rw-r--r-- 1 weare weare 465 Jan 1 23:27 HelloWorld_v2.py
-rw-r--r-- 1 weare weare 882 Jan 1 23:27 Learn_BranchPythonOperator.py
-rw-r--r-- 1 weare weare 820 Jan 1 23:27 Learn_Jinja.py
-rw-r--r-- 1 weare weare 632 Jan 1 23:27 Learn_LatestOnlyOperator.py
-rw-r--r-- 1 weare weare 1213 Jan 1 23:27 Learn_TaskGroups.py
-rw-r--r-- 1 weare weare 652 Jan 1 23:27 Learn_TriggerRule.py
-rw-r--r-- 1 weare weare 1500 Jan 1 23:27 MySQL_to_Redshift.py
-rw-r--r-- 1 weare weare 1643 Jan 1 23:27 MySQL_to_Redshift_v2.py
-rw-r--r-- 1 weare weare 2402 Jan 1 23:27 NameGenderCSVtoRedshift.py
-rw-r--r-- 1 weare weare 3087 Jan 1 23:27 NameGenderCSVtoRedshift_v2.py
-rw-r--r-- 1 weare weare 3190 Jan 1 23:27 NameGenderCSVtoRedshift_v3.py
-rw-r--r-- 1 weare weare 3174 Jan 1 23:27 NameGenderCSVtoRedshift_v4.py
-rw-r--r-- 1 weare weare 2347 Jan 1 23:27 NameGenderCSVtoRedshift_v5.py
-rw-r--r-- 1 weare weare 851 Jan 1 23:27 SQL_to_Sheet.py
-rw-r--r-- 1 weare weare 1827 Jan 1 23:27 UpdateSymbol.py
-rw-r--r-- 1 weare weare 2349 Jan 1 23:27 UpdateSymbol_v2.py
-rw-r--r-- 1 weare weare 2579 Jan 1 23:27 Weather_to_Redshift.py
-rw-r--r-- 1 weare weare 3352 Jan 1 23:27 Weather_to_Redshift_v2.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 config
-rw-r--r-- 1 weare weare 10651 Jan 1 23:27 docker-compose.test.yaml
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 dynamic_dags
-rw-r--r-- 1 weare weare 420 Jan 1 23:27 get_price_APPL.py
-rw-r--r-- 1 weare weare 421 Jan 1 23:27 get_price_GOOG.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 plugins
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 trigger_dags - 새로 받은 dags들(learn-airflow/dags) 모두 원래 dag위치( dags/ ) 로 이동
weare@DESKTOP-BE1I4GE:~/airflow-setup$ cp -r learn-airflow/dags/* dags/ - 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
total 144
drwxrwxr-x 2 weare root 4096 Jan 1 23:33 __pycache__
-rw-r--r-- 1 weare weare 2311 Jan 1 23:33 Backup_Airflow_Data_to_S3.py
-rw-r--r-- 1 weare weare 1968 Jan 1 23:33 Build_Summary.py
-rw-r--r-- 1 weare weare 774 Jan 1 23:33 Build_Summary_v2.py
-rw-r--r-- 1 weare weare 1419 Jan 1 23:33 Cleanup_Log.py
-rw-r--r-- 1 weare weare 4694 Jan 1 23:33 Gsheet_to_Redshift.py
-rw-r--r-- 1 weare root 803 Jan 1 23:33 HelloWorld.py
-rw-r--r-- 1 weare root 465 Jan 1 23:33 HelloWorld_v2.py
-rw-r--r-- 1 weare weare 882 Jan 1 23:33 Learn_BranchPythonOperator.py
-rw-r--r-- 1 weare weare 820 Jan 1 23:33 Learn_Jinja.py
-rw-r--r-- 1 weare weare 632 Jan 1 23:33 Learn_LatestOnlyOperator.py
-rw-r--r-- 1 weare weare 1213 Jan 1 23:33 Learn_TaskGroups.py
-rw-r--r-- 1 weare weare 652 Jan 1 23:33 Learn_TriggerRule.py
-rw-r--r-- 1 weare weare 1500 Jan 1 23:33 MySQL_to_Redshift.py
-rw-r--r-- 1 weare weare 1643 Jan 1 23:33 MySQL_to_Redshift_v2.py
-rw-r--r-- 1 weare weare 2402 Jan 1 23:33 NameGenderCSVtoRedshift.py
-rw-r--r-- 1 weare weare 3087 Jan 1 23:33 NameGenderCSVtoRedshift_v2.py
-rw-r--r-- 1 weare weare 3190 Jan 1 23:33 NameGenderCSVtoRedshift_v3.py
-rw-r--r-- 1 weare weare 3174 Jan 1 23:33 NameGenderCSVtoRedshift_v4.py
-rw-r--r-- 1 weare weare 2347 Jan 1 23:33 NameGenderCSVtoRedshift_v5.py
-rw-r--r-- 1 weare weare 851 Jan 1 23:33 SQL_to_Sheet.py
-rw-r--r-- 1 weare weare 1827 Jan 1 23:33 UpdateSymbol.py
-rw-r--r-- 1 weare weare 2349 Jan 1 23:33 UpdateSymbol_v2.py
-rw-r--r-- 1 weare weare 2579 Jan 1 23:33 Weather_to_Redshift.py
-rw-r--r-- 1 weare weare 3352 Jan 1 23:33 Weather_to_Redshift_v2.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 config
-rw-r--r-- 1 weare weare 10651 Jan 1 23:33 docker-compose.test.yaml
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 dynamic_dags
-rw-r--r-- 1 weare weare 420 Jan 1 23:33 get_price_APPL.py
-rw-r--r-- 1 weare weare 421 Jan 1 23:33 get_price_GOOG.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 plugins
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 trigger_dags
-rw-r--r-- 1 weare root 717 Dec 29 21:40 TestDAG.py - vim 편집기로 NameGenderCSVtoRedshift_v4 접속 후 개인 id로 수정
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim NameGenderCSVtoRedshift_v5.py - 웹 ui(http://localhost:8080/)에 접속해보면 task 3개 모두 정상 실행된것을 확인할수 있다.
'airflow(에어플로우)' 카테고리의 다른 글
airflow dag(주식정보 수집) 실습1 -full refresh 방식 (0) | 2024.01.02 |
---|---|
airflow dag(task decorator 활용) 실습 3 (0) | 2024.01.02 |
airflow dag(params 활용) 실습 1 (0) | 2024.01.02 |
airflow dag(hello world) 실습 (0) | 2024.01.01 |
airflow 기초 (0) | 2023.12.31 |