task decorator 활용한 방법
from airflow import DAG from airflow.models import Variable from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.decorators import task from datetime import datetime from datetime import timedelta import requests import logging def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor() @task def extract(url): logging.info(datetime.utcnow()) f = requests.get(url) return f.text @task def transform(text): lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리 records = [] for l in lines: (name, gender) = l.split(",") # l = " 본인 ID ,M" -> [ ' 본인 ID ', 'M' ] records.append([name, gender]) logging.info("Transform ended") return records @task def load(schema, table, records): logging.info("load started") cur = get_Redshift_connection() """ records = [ [ " 본인 ID ", "M" ], [ "Claire", "F" ], ... ] """ # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음 try: cur.execute("BEGIN;") cur.execute(f"DELETE FROM {schema}.name_gender;") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: name = r[0] gender = r[1] print(name, "-", gender) sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;") logging.info("load done") with DAG( dag_id='namegender_v5', start_date=datetime(2022, 10, 6), # 날짜가 미래인 경우 실행이 안됨 schedule='0 2 * * *', # 적당히 조절 max_active_runs=1, catchup=False, default_args={ 'retries': 1, 'retry_delay': timedelta(minutes=3), # 'on_failure_callback': slack.on_failure_callback, } ) as dag: url = Variable.get("csv_url") schema = ' 본인 ID ' ## 자신의 스키마로 변경 table = 'name_gender' lines = transform(extract(url)) load(schema, table, lines) |
커맨드 라인으로 실습파일 받고 실행
- 우선 airflow-setup 폴더에서 현재 폴더 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl
total 32
drwxr-xr-x 5 weare root 4096 Dec 29 21:53 logs
drwxr-xr-x 3 weare root 4096 Dec 29 21:53 dags
drwxr-xr-x 2 weare root 4096 Dec 29 21:49 plugins
-rw-r--r-- 1 weare weare 10493 Dec 29 21:41 docker-compose.yaml
-rw-r--r-- 1 weare weare 15 Dec 29 21:40 README.md
drwxr-xr-x 3 weare weare 4096 Dec 29 21:40 docs - 현재 dags 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
total 16
drwxrwxr-x 2 weare root 4096 Dec 29 21:53 __pycache__
-rw-r--r-- 1 weare root 797 Dec 29 21:40 HelloWorld.py
-rw-r--r-- 1 weare root 465 Dec 29 21:40 HelloWorld_v2.py
-rw-r--r-- 1 weare root 717 Dec 29 21:40 TestDAG.py - 깃에서 새로 실습할 dag들 다운 받음
weare@DESKTOP-BE1I4GE:~/airflow-setup$ git clone https://github.com/learndataeng/learn-airflow
Cloning into 'learn-airflow'...
remote: Enumerating objects: 219, done.
remote: Counting objects: 100% (95/95), done.
remote: Compressing objects: 100% (93/93), done.
remote: Total 219 (delta 31), reused 2 (delta 0), pack-reused 124
Receiving objects: 100% (219/219), 71.07 KiB | 8.88 MiB/s, done.
Resolving deltas: 100% (106/106), done. - 새로 받은 폴더의 내용 확인 (dags 폴더 확인)
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow
total 12
-rw-r--r-- 1 weare weare 15 Jan 1 23:27 README.md
drwxr-xr-x 6 weare weare 4096 Jan 1 23:27 dags
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 data - dags 폴더 내부 dag 리스트 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow/dags
total 136
-rw-r--r-- 1 weare weare 2311 Jan 1 23:27 Backup_Airflow_Data_to_S3.py
-rw-r--r-- 1 weare weare 1968 Jan 1 23:27 Build_Summary.py
-rw-r--r-- 1 weare weare 774 Jan 1 23:27 Build_Summary_v2.py
-rw-r--r-- 1 weare weare 1419 Jan 1 23:27 Cleanup_Log.py
-rw-r--r-- 1 weare weare 4694 Jan 1 23:27 Gsheet_to_Redshift.py
-rw-r--r-- 1 weare weare 803 Jan 1 23:27 HelloWorld.py
-rw-r--r-- 1 weare weare 465 Jan 1 23:27 HelloWorld_v2.py
-rw-r--r-- 1 weare weare 882 Jan 1 23:27 Learn_BranchPythonOperator.py
-rw-r--r-- 1 weare weare 820 Jan 1 23:27 Learn_Jinja.py
-rw-r--r-- 1 weare weare 632 Jan 1 23:27 Learn_LatestOnlyOperator.py
-rw-r--r-- 1 weare weare 1213 Jan 1 23:27 Learn_TaskGroups.py
-rw-r--r-- 1 weare weare 652 Jan 1 23:27 Learn_TriggerRule.py
-rw-r--r-- 1 weare weare 1500 Jan 1 23:27 MySQL_to_Redshift.py
-rw-r--r-- 1 weare weare 1643 Jan 1 23:27 MySQL_to_Redshift_v2.py
-rw-r--r-- 1 weare weare 2402 Jan 1 23:27 NameGenderCSVtoRedshift.py
-rw-r--r-- 1 weare weare 3087 Jan 1 23:27 NameGenderCSVtoRedshift_v2.py
-rw-r--r-- 1 weare weare 3190 Jan 1 23:27 NameGenderCSVtoRedshift_v3.py
-rw-r--r-- 1 weare weare 3174 Jan 1 23:27 NameGenderCSVtoRedshift_v4.py
-rw-r--r-- 1 weare weare 2347 Jan 1 23:27 NameGenderCSVtoRedshift_v5.py
-rw-r--r-- 1 weare weare 851 Jan 1 23:27 SQL_to_Sheet.py
-rw-r--r-- 1 weare weare 1827 Jan 1 23:27 UpdateSymbol.py
-rw-r--r-- 1 weare weare 2349 Jan 1 23:27 UpdateSymbol_v2.py
-rw-r--r-- 1 weare weare 2579 Jan 1 23:27 Weather_to_Redshift.py
-rw-r--r-- 1 weare weare 3352 Jan 1 23:27 Weather_to_Redshift_v2.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 config
-rw-r--r-- 1 weare weare 10651 Jan 1 23:27 docker-compose.test.yaml
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 dynamic_dags
-rw-r--r-- 1 weare weare 420 Jan 1 23:27 get_price_APPL.py
-rw-r--r-- 1 weare weare 421 Jan 1 23:27 get_price_GOOG.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 plugins
drwxr-xr-x 2 weare weare 4096 Jan 1 23:27 trigger_dags - 새로 받은 dags들(learn-airflow/dags) 모두 원래 dag위치( dags/ ) 로 이동
weare@DESKTOP-BE1I4GE:~/airflow-setup$ cp -r learn-airflow/dags/* dags/ - 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
total 144
drwxrwxr-x 2 weare root 4096 Jan 1 23:33 __pycache__
-rw-r--r-- 1 weare weare 2311 Jan 1 23:33 Backup_Airflow_Data_to_S3.py
-rw-r--r-- 1 weare weare 1968 Jan 1 23:33 Build_Summary.py
-rw-r--r-- 1 weare weare 774 Jan 1 23:33 Build_Summary_v2.py
-rw-r--r-- 1 weare weare 1419 Jan 1 23:33 Cleanup_Log.py
-rw-r--r-- 1 weare weare 4694 Jan 1 23:33 Gsheet_to_Redshift.py
-rw-r--r-- 1 weare root 803 Jan 1 23:33 HelloWorld.py
-rw-r--r-- 1 weare root 465 Jan 1 23:33 HelloWorld_v2.py
-rw-r--r-- 1 weare weare 882 Jan 1 23:33 Learn_BranchPythonOperator.py
-rw-r--r-- 1 weare weare 820 Jan 1 23:33 Learn_Jinja.py
-rw-r--r-- 1 weare weare 632 Jan 1 23:33 Learn_LatestOnlyOperator.py
-rw-r--r-- 1 weare weare 1213 Jan 1 23:33 Learn_TaskGroups.py
-rw-r--r-- 1 weare weare 652 Jan 1 23:33 Learn_TriggerRule.py
-rw-r--r-- 1 weare weare 1500 Jan 1 23:33 MySQL_to_Redshift.py
-rw-r--r-- 1 weare weare 1643 Jan 1 23:33 MySQL_to_Redshift_v2.py
-rw-r--r-- 1 weare weare 2402 Jan 1 23:33 NameGenderCSVtoRedshift.py
-rw-r--r-- 1 weare weare 3087 Jan 1 23:33 NameGenderCSVtoRedshift_v2.py
-rw-r--r-- 1 weare weare 3190 Jan 1 23:33 NameGenderCSVtoRedshift_v3.py
-rw-r--r-- 1 weare weare 3174 Jan 1 23:33 NameGenderCSVtoRedshift_v4.py
-rw-r--r-- 1 weare weare 2347 Jan 1 23:33 NameGenderCSVtoRedshift_v5.py
-rw-r--r-- 1 weare weare 851 Jan 1 23:33 SQL_to_Sheet.py
-rw-r--r-- 1 weare weare 1827 Jan 1 23:33 UpdateSymbol.py
-rw-r--r-- 1 weare weare 2349 Jan 1 23:33 UpdateSymbol_v2.py
-rw-r--r-- 1 weare weare 2579 Jan 1 23:33 Weather_to_Redshift.py
-rw-r--r-- 1 weare weare 3352 Jan 1 23:33 Weather_to_Redshift_v2.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 config
-rw-r--r-- 1 weare weare 10651 Jan 1 23:33 docker-compose.test.yaml
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 dynamic_dags
-rw-r--r-- 1 weare weare 420 Jan 1 23:33 get_price_APPL.py
-rw-r--r-- 1 weare weare 421 Jan 1 23:33 get_price_GOOG.py
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 plugins
drwxr-xr-x 2 weare weare 4096 Jan 1 23:33 trigger_dags
-rw-r--r-- 1 weare root 717 Dec 29 21:40 TestDAG.py - vim 편집기로 NameGenderCSVtoRedshift_v5 접속 후 개인 id로 수정
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim NameGenderCSVtoRedshift_v5.py - 웹 ui(http://localhost:8080/)에 접속해보면 task 3개 모두 정상 실행된것을 확인할수 있다.
'airflow(에어플로우)' 카테고리의 다른 글
airflow dag(주식정보 수집) 실습2-incremental update (0) | 2024.01.02 |
---|---|
airflow dag(주식정보 수집) 실습1 -full refresh 방식 (0) | 2024.01.02 |
airflow dag(xcom 활용) 실습 2 (0) | 2024.01.02 |
airflow dag(params 활용) 실습 1 (0) | 2024.01.02 |
airflow dag(hello world) 실습 (0) | 2024.01.01 |