거의 수정없이 python 식으로 코딩

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host =  "learnde.(생략).redshift.amazonaws.com"
    user = " 본인 ID "  # 본인 ID 사용
    password = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()

def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)

def transform(text):
    logging.info("Transform started")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = " " 본인 ID " ,M" -> [ ' 본인 ID ', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records

def load(records):
    logging.info("load started")
    """
    records = [
      [ " 본인 ID ", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = " 본인 ID "
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")

def etl():
    link = "https://s3-geospatial.(생략).amazonaws.com/name_gender.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)

dag_second_assignment = DAG(
dag_id = 'name_gender',
catchup = False,
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *')  # 적당히 조절

task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)

결론 : 개인정보가 너무 노출된다. 링크 하드코딩 말고 params로 줘보자


params 활용 수정

  • 수정사항 : params 통해 변수넘기기, execution_date 얻어내기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = "learnde.(생략).redshift.amazonaws.com"
    redshift_user = " 본인 ID "  # 본인 ID 사용
    redshift_pass = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()

def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)

def transform(text):
    logging.info("Transform started")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = " 본인 ID ,M" -> [ ' 본인 ID ', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records

def load(records):
    logging.info("load started")
    """
    records = [
      [ " 본인 ID ", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = " 본인 ID "
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")

def etl(**context):
    link = context["params"]["url"]
    # task 자체에 대한 정보를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
    # https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
    task_instance = context['task_instance']
    execution_date = context['execution_date']
    logging.info(execution_date)
    data = extract(link)
    lines = transform(data)
    load(lines)

dag = DAG(
    dag_id = 'name_gender_v2',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl,
    params = {
        'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    },
    dag = dag)

 

'airflow(에어플로우)' 카테고리의 다른 글

airflow dag(task decorator 활용) 실습 3  (0) 2024.01.02
airflow dag(xcom 활용) 실습 2  (0) 2024.01.02
airflow dag(hello world) 실습  (0) 2024.01.01
airflow 기초  (0) 2023.12.31
데이터 파이프라인  (1) 2023.12.30

+ Recent posts