거의 수정없이 python 식으로 코딩
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime import requests import logging import psycopg2 def get_Redshift_connection(): host = "learnde.(생략).redshift.amazonaws.com" user = " 본인 ID " # 본인 ID 사용 password = "..." # 본인 Password 사용 port = 5439 dbname = "dev" conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}") conn.set_session(autocommit=True) return conn.cursor() def extract(url): logging.info("Extract started") f = requests.get(url) logging.info("Extract done") return (f.text) def transform(text): logging.info("Transform started") lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리 records = [] for l in lines: (name, gender) = l.split(",") # l = " " 본인 ID " ,M" -> [ ' 본인 ID ', 'M' ] records.append([name, gender]) logging.info("Transform ended") return records def load(records): logging.info("load started") """ records = [ [ " 본인 ID ", "M" ], [ "Claire", "F" ], ... ] """ schema = " 본인 ID " # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음 cur = get_Redshift_connection() try: cur.execute("BEGIN;") cur.execute(f"DELETE FROM {schema}.name_gender;") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: name = r[0] gender = r[1] print(name, "-", gender) sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;") logging.info("load done") def etl(): link = "https://s3-geospatial.(생략).amazonaws.com/name_gender.csv" data = extract(link) lines = transform(data) load(lines) dag_second_assignment = DAG( dag_id = 'name_gender', catchup = False, start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 2 * * *') # 적당히 조절 task = PythonOperator( task_id = 'perform_etl', python_callable = etl, dag = dag_second_assignment) 결론 : 개인정보가 너무 노출된다. 링크 하드코딩 말고 params로 줘보자 |
params 활용 수정
- 수정사항 : params 통해 변수넘기기, execution_date 얻어내기
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 def get_Redshift_connection(): host = "learnde.(생략).redshift.amazonaws.com" redshift_user = " 본인 ID " # 본인 ID 사용 redshift_pass = "..." # 본인 Password 사용 port = 5439 dbname = "dev" conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}") conn.set_session(autocommit=True) return conn.cursor() def extract(url): logging.info("Extract started") f = requests.get(url) logging.info("Extract done") return (f.text) def transform(text): logging.info("Transform started") lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리 records = [] for l in lines: (name, gender) = l.split(",") # l = " 본인 ID ,M" -> [ ' 본인 ID ', 'M' ] records.append([name, gender]) logging.info("Transform ended") return records def load(records): logging.info("load started") """ records = [ [ " 본인 ID ", "M" ], [ "Claire", "F" ], ... ] """ schema = " 본인 ID " # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음 cur = get_Redshift_connection() try: cur.execute("BEGIN;") cur.execute(f"DELETE FROM {schema}.name_gender;") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: name = r[0] gender = r[1] print(name, "-", gender) sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;") logging.info("load done") def etl(**context): link = context["params"]["url"] # task 자체에 대한 정보를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능 # https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance task_instance = context['task_instance'] execution_date = context['execution_date'] logging.info(execution_date) data = extract(link) lines = transform(data) load(lines) dag = DAG( dag_id = 'name_gender_v2', start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 2 * * *', # 적당히 조절 catchup = False, max_active_runs = 1, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) task = PythonOperator( task_id = 'perform_etl', python_callable = etl, params = { 'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv" }, dag = dag) |
'airflow(에어플로우)' 카테고리의 다른 글
airflow dag(task decorator 활용) 실습 3 (0) | 2024.01.02 |
---|---|
airflow dag(xcom 활용) 실습 2 (0) | 2024.01.02 |
airflow dag(hello world) 실습 (0) | 2024.01.01 |
airflow 기초 (0) | 2023.12.31 |
데이터 파이프라인 (1) | 2023.12.30 |