목표

  • open weathermap api (위도 경도 기반 기후정보) 를 이용해 제작
  • 서울 8일 낮/최소/최대 온도 읽기
  • full refresh & insert into로 구현

open weather api 응답

  • day,min,max, night, eve, morn


실습

  1. api 키를 넣는 변수 설정
  2. vim 편집기로 Weather_to_Redshift.py 진입 
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim Weather_to_Redshift.py

  3. 본인 스키마에 맞게 내용 수정후 저장
  4. 코드
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json

def get_Redshift_connection():
    # Redshift에 연결하고 커서를 반환합니다.
    # 기본적으로 autocommit은 False입니다.
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    # OpenWeatherMap API에서 Redshift 테이블로 ETL(Extract, Transform, Load)을 수행하는 함수

    # Airflow 변수에서 OpenWeatherMap API 키를 가져옵니다.
    api_key = Variable.get("open_weather_api_key")
    
    # 위치에 대한 위도 및 경도를 정의합니다. (이 경우 서울)
    lat = 37.5665
    lon = 126.9780

    # one-call API를 위한 API URL을 구성합니다.
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)

    # API 응답에서 관련 정보를 추출합니다.
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    # Redshift 연결 커서를 가져옵니다.
    cur = get_Redshift_connection()

    # 테이블 삭제 및 재생성을 위한 SQL 문
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    # 데이터를 테이블에 삽입하기 위한 SQL 문
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)

    # SQL 문을 로그에 기록합니다.
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)

    # 예외가 발생할 경우를 대비해 try-except 블록 내에서 SQL 문을 실행합니다.
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        # 예외 발생 시 트랜잭션을 롤백합니다.
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30),
    schedule = '0 2 * * *',
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    # 다음 작업의 목적을 설명하는 주석을 추가합니다.
    etl("본인스키마", "weather_forecast")

 

3. 실행 확인

목표

  • api로부터 나라 정보를 추출한다.
  • full regresh로 구현할것
  • 아래 3개의 정보를 추출해서 redshift에 각자 스키마 밑에 테이블 생성
    country ->["name"]["official"]
    population -> ["population"]
    area -> ["area"]
  • UTC로 매주 토요일 오전 6시 30분에 실행되게 만들것

코드

import requests
from airflow.decorators import DAG, task
from datetime import datetime

@task
def extract_transform():
    # 국가 정보를 얻기 위한 API 요청
    response = requests.get('https://restcountries.com/v3/all')
    countries = response.json()
        
    # 응답을 처리하고 관련 정보 추출
    records = []
    for country in countries:
        name = country['name']['common']
        population = country['population']
        area = country['area']
        records.append([name, population, area])
        
    return records

    # 데이터베이스로 데이터 로드
results = extract_transform()

# DAG 정의
with DAG(
    dag_id='CountryInfo',
    start_date=datetime(2023, 5, 30),
    catchup=False,
    tags=['API'],
    schedule='30 6 * * 6'  # 0 - 일요일, …, 6 - 토요일 -> 매주 토요일 오전 6시30분에 실행됨
) as dag:

    # 데이터 추출 및 변환을 위한 태스크
    results = extract_transform()
    # load 함수는 추출한 데이터를 데이터베이스 테이블에 삽입하는 작업을 수행합니다.
    load("keeyong", "country_info", results)

# SQL 문으로 데이터베이스에 대상 테이블을 만듭니다.
create_table_sql = """
CREATE TABLE {schema}.{table} (
    name varchar(256) primary key,
    population int,
    area float
);

 

  1. DAGs 폴더 기본 저장위치 설정
    dags_folder=/var/lib/airflow/dags 
    (airflow 디렉토리 밑의 dags 폴더)
  2. Dag 추가시 Airflow 적용 시기
    dag_dir_list_interval =300
    (5분)
  3. airflow를 api 형태로 외부에서 조작
    api 섹션의 auth_backend -> airflow.api.auth.backend.basic_auth
    + basic_auth : id pw를 가지고 인증하는 방식
  4. variable에서 변수의 값이 encrypted(123***)로 만들기
    변수이름에 아래 단어들이 variable의 key에 들어가야함 password, secret, passwd, authorization, api_key, apikey, access_token

  5. 이 환경 설정 파일이 수정됐다면, 실제로 반영하려면?
    sudo systemctl restart airflow-webserver
    sudo systemctl restart airflow-scheduler

  6. metadata db의 내용을 암호화
    fernet_key
  7. 시간 세팅
    default_timezone : 스케줄러가 실행하는 시간(start_date,end_date,schedule)
    + execution_date,로그기록시간은 예외적으로 영국기준시UTC를 따라 혼동을 준다.
    + 따라서 전부다 UTC로 통일하는걸 권장
    default_ui_timezone : 웹 사용자 인터페이스(UI)에서 사용되는 기본 시간대

 

 

목표

  • yahoo finance API를 호출해서 애플 주식 정보 수집( 지난 30일)
  • redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거
  • incremental 업데이트 방식으로 매일 하루치의 데이터가 늘어나야함
  • 트랜잭션 형태 구성

간단 프로세스

  1. 임시 테이블(tem)을 임시 생성하면서 원본 테이블의 레코드를 복사(CREATE TEMP TABLE ...AS SELECT)
  2. 임시 테이블로 Yahoo finance API로 읽어온 레코드를 적재
  3. 원본 테이블 삭제하고 새로 생성
  4. 원본 테이블에 임시 테이블 내용을 복사( select distinct*를 사용해서 중복제거)

코드

# 필요한 라이브러리 및 모듈을 가져옵니다
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging

# Redshift 연결을 가져오는 함수
def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

# 특정 주식 심볼에 대한 역사적 주가를 가져오는 태스크
@task
def get_historical_prices(symbol):
    # yfinance 라이브러리를 사용하여 주식의 역사적 가격을 가져옵니다
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    # 데이터를 Redshift로 로딩하기 위해 변환 및 구조화
    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')
        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

# Redshift에서 테이블을 만들거나 삭제하는 함수
def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")

# 역사적 주식 가격을 Redshift로 로딩하는 태스크
@task
def load(schema, table, records):
    logging.info("로딩 시작")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")

        # 처음실행된 경우 예외 : 원본 테이블이 없으면 생성 , drop_first= False
        _create_table(cur, schema, table, False)

        # 1. 임시 테이블(tem)을 임시 생성하면서 원본 테이블의 레코드를 복사
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        # 2. 임시 테이블로 Yahoo finance API로 읽어온 레코드를 적재
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 3. 원본 테이블 삭제하고 새로 생성  , drop_first= True
        _create_table(cur, schema, table, True)

        # 4. 원본 테이블에 임시 테이블 내용을 복사
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("로딩 완료")

# DAG를 정의합니다
with DAG(
    dag_id='UpdateSymbol_v2',
    start_date=datetime(2023, 5, 30),
    catchup=False,
    tags=['API'],
    schedule='0 10 * * *'  # DAG를 매일 오전 10:00에 실행
) as dag:

    # 주식 심볼 "AAPL"에 대한 역사적 가격을 가져오는 태스크
    results = get_historical_prices("AAPL")
    # 가져온 역사적 가격을 Redshift 테이블 "stock_info_v2"로 로딩하는 태스크
    load("본인스키마", "stock_info_v2", results)

 

실행

  • dags 폴더에 진입
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ cd dags

  • 수정할 dag 파일 찾음
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ ls
    Backup_Airflow_Data_to_S3.py  HelloWorld.py                  Learn_TaskGroups.py         NameGenderCSVtoRedshift_v2.py  TestDAG.py                 __pycache__               get_price_GOOG.py
    Build_Summary.py              HelloWorld_v2.py               Learn_TriggerRule.py        NameGenderCSVtoRedshift_v3.py  UpdateSymbol.py            config                    plugins
    Build_Summary_v2.py           Learn_BranchPythonOperator.py  MySQL_to_Redshift.py        NameGenderCSVtoRedshift_v4.py  UpdateSymbol_v2.py         docker-compose.test.yaml  trigger_dags
    Cleanup_Log.py                Learn_Jinja.py                 MySQL_to_Redshift_v2.py     NameGenderCSVtoRedshift_v5.py  Weather_to_Redshift.py     dynamic_dags
    Gsheet_to_Redshift.py         Learn_LatestOnlyOperator.py    NameGenderCSVtoRedshift.py  SQL_to_Sheet.py                Weather_to_Redshift_v2.py  get_price_APPL.py

  • vim 편집기로 UpdateSymbol_v2.py 파일에 스키마, 레드쉬프트의 본인 스키마로 교체
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol_v2.py
  • 웹 ui(http://localhost:8080/) 에서 정상 실행 확인

목표

  • yahoo finance API를 호출해서 애플 주식 정보 수집( 지난 30일)
  • redshift 상의 테이블로 1에서 받은 레코드들을 적재
  • full refresh로 구현
  • 트랜잭션 형태를 구현

코드 

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')
        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("wearealego", "stock_info", results)

 

실습

  1. 컨테이너 부팅시마다 모듈 설치를 위해 yaml 파일 수정 
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls
    README.md  dags  docker-compose.yaml  docs  learn-airflow  logs  plugins

  2. vim 편집기로 yaml 파일 열기
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ vim docker-compose.yaml

  3. 내용 수정(해당 빨간선 내용 추가) : 실습에 필요한 yfinanace 포함 기초적으로 필요한 모듈들.
  4. dags 폴더로 이동
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ cd dags

  5. 내용 바꾸려는 dag인 UpdateSymbol.py 찾음
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ ls
    Backup_Airflow_Data_to_S3.py  HelloWorld.py                  Learn_TaskGroups.py         NameGenderCSVtoRedshift_v2.py  TestDAG.py                 __pycache__               get_price_GOOG.py
    Build_Summary.py              HelloWorld_v2.py               Learn_TriggerRule.py        NameGenderCSVtoRedshift_v3.py  UpdateSymbol.py            config                    plugins
    Build_Summary_v2.py           Learn_BranchPythonOperator.py  MySQL_to_Redshift.py        NameGenderCSVtoRedshift_v4.py  UpdateSymbol_v2.py         docker-compose.test.yaml  trigger_dags
    Cleanup_Log.py                Learn_Jinja.py                 MySQL_to_Redshift_v2.py     NameGenderCSVtoRedshift_v5.py  Weather_to_Redshift.py     dynamic_dags
    Gsheet_to_Redshift.py         Learn_LatestOnlyOperator.py    NameGenderCSVtoRedshift.py  SQL_to_Sheet.py                Weather_to_Redshift_v2.py  get_price_APPL.py

  6. vim 편집기로 UpdateSymbol.py 진입 후 본인 아이디로 수정 후 저장 
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol.py
  7. 커맨드 라인에서 test 해보자
    1) 도커 스케줄러 주소 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker ps
    CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                    PORTS                                       NAMES
    13c7de52342d   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-triggerer_1
    8e40ffd79576   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   airflow-setup_airflow-webserver_1
    c57811bc9ba1   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-worker_1
    f14cc34e94be   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-scheduler_1
    48ee152ae746   redis:latest           "docker-entrypoint.s…"   3 days ago       Up 12 minutes (healthy)   6379/tcp                                    airflow-setup_redis_1
    a5e756836619   postgres:13            "docker-entrypoint.s…"   3 days ago       Up 12 minutes (healthy)   5432/tcp                                    airflow-setup_postgres_1

    2) 스케줄러 shell 접속
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker exec -it f14cc34e94be sh

    3) UpdateSymbol의 dag 리스트 확인
    (airflow)airflow tasks list UpdateSymbol
    /home/airflow/.local/lib/python3.7/site-packages/airflow/models/base.py:49 MovedIn20Warning: [31mDeprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. [32mTo prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". [36mSet environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message.[0m (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
    SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')
    get_historical_prices
    load

    4) UpdateSymbol  dag 테스트 
    (airflow)airflow dags test UpdateSymbol 2023-05-30
    (중략)
    [2024-01-01 16:02:30,217] {dagrun.py:673} INFO - DagRun Finished: dag_id=UpdateSymbol, execution_date=2023-05-30T00:00:00+00:00, run_id=manual__2023-05-30T00:00:00+00:00, run_start_date=2023-05-30T00:00:00+00:00, run_end_date=2024-01-01 16:02:30.216874+00:00, run_duration=18720150.216874, state=success, external_trigger=False, run_type=manual, data_interval_start=2023-05-30T00:00:00+00:00, data_interval_end=2023-05-30T10:00:00+00:00, dag_hash=None

정상실행 완료!

task decorator 활용한 방법

from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = " 본인 ID ,M" -> [ ' 본인 ID ', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    """
    records = [
      [ " 본인 ID ", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),  # 날짜가 미래인 경우 실행이 안됨
    schedule='0 2 * * *',  # 적당히 조절
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:

    url = Variable.get("csv_url")
    schema = ' 본인 ID '   ## 자신의 스키마로 변경
    table = 'name_gender'

    lines = transform(extract(url))
    load(schema, table, lines)

 

커맨드 라인으로 실습파일 받고 실행

  1. 우선 airflow-setup 폴더에서 현재 폴더 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl
    total 32
    drwxr-xr-x 5 weare root   4096 Dec 29 21:53 logs
    drwxr-xr-x 3 weare root   4096 Dec 29 21:53 dags
    drwxr-xr-x 2 weare root   4096 Dec 29 21:49 plugins
    -rw-r--r-- 1 weare weare 10493 Dec 29 21:41 docker-compose.yaml
    -rw-r--r-- 1 weare weare    15 Dec 29 21:40 README.md
    drwxr-xr-x 3 weare weare  4096 Dec 29 21:40 docs

  2. 현재 dags 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
    total 16
    drwxrwxr-x 2 weare root 4096 Dec 29 21:53 __pycache__
    -rw-r--r-- 1 weare root  797 Dec 29 21:40 HelloWorld.py
    -rw-r--r-- 1 weare root  465 Dec 29 21:40 HelloWorld_v2.py
    -rw-r--r-- 1 weare root  717 Dec 29 21:40 TestDAG.py

  3. 깃에서 새로 실습할 dag들 다운 받음
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ git clone https://github.com/learndataeng/learn-airflow
    Cloning into 'learn-airflow'...
    remote: Enumerating objects: 219, done.
    remote: Counting objects: 100% (95/95), done.
    remote: Compressing objects: 100% (93/93), done.
    remote: Total 219 (delta 31), reused 2 (delta 0), pack-reused 124
    Receiving objects: 100% (219/219), 71.07 KiB | 8.88 MiB/s, done.
    Resolving deltas: 100% (106/106), done.

  4. 새로 받은 폴더의 내용 확인 (dags 폴더 확인)
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow
    total 12
    -rw-r--r-- 1 weare weare   15 Jan  1 23:27 README.md
    drwxr-xr-x 6 weare weare 4096 Jan  1 23:27 dags
    drwxr-xr-x 2 weare weare 4096 Jan  1 23:27 data

  5. dags 폴더 내부 dag 리스트 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl learn-airflow/dags
    total 136
    -rw-r--r-- 1 weare weare  2311 Jan  1 23:27 Backup_Airflow_Data_to_S3.py
    -rw-r--r-- 1 weare weare  1968 Jan  1 23:27 Build_Summary.py
    -rw-r--r-- 1 weare weare   774 Jan  1 23:27 Build_Summary_v2.py
    -rw-r--r-- 1 weare weare  1419 Jan  1 23:27 Cleanup_Log.py
    -rw-r--r-- 1 weare weare  4694 Jan  1 23:27 Gsheet_to_Redshift.py
    -rw-r--r-- 1 weare weare   803 Jan  1 23:27 HelloWorld.py
    -rw-r--r-- 1 weare weare   465 Jan  1 23:27 HelloWorld_v2.py
    -rw-r--r-- 1 weare weare   882 Jan  1 23:27 Learn_BranchPythonOperator.py
    -rw-r--r-- 1 weare weare   820 Jan  1 23:27 Learn_Jinja.py
    -rw-r--r-- 1 weare weare   632 Jan  1 23:27 Learn_LatestOnlyOperator.py
    -rw-r--r-- 1 weare weare  1213 Jan  1 23:27 Learn_TaskGroups.py
    -rw-r--r-- 1 weare weare   652 Jan  1 23:27 Learn_TriggerRule.py
    -rw-r--r-- 1 weare weare  1500 Jan  1 23:27 MySQL_to_Redshift.py
    -rw-r--r-- 1 weare weare  1643 Jan  1 23:27 MySQL_to_Redshift_v2.py
    -rw-r--r-- 1 weare weare  2402 Jan  1 23:27 NameGenderCSVtoRedshift.py
    -rw-r--r-- 1 weare weare  3087 Jan  1 23:27 NameGenderCSVtoRedshift_v2.py
    -rw-r--r-- 1 weare weare  3190 Jan  1 23:27 NameGenderCSVtoRedshift_v3.py
    -rw-r--r-- 1 weare weare  3174 Jan  1 23:27 NameGenderCSVtoRedshift_v4.py
    -rw-r--r-- 1 weare weare  2347 Jan  1 23:27 NameGenderCSVtoRedshift_v5.py
    -rw-r--r-- 1 weare weare   851 Jan  1 23:27 SQL_to_Sheet.py
    -rw-r--r-- 1 weare weare  1827 Jan  1 23:27 UpdateSymbol.py
    -rw-r--r-- 1 weare weare  2349 Jan  1 23:27 UpdateSymbol_v2.py
    -rw-r--r-- 1 weare weare  2579 Jan  1 23:27 Weather_to_Redshift.py
    -rw-r--r-- 1 weare weare  3352 Jan  1 23:27 Weather_to_Redshift_v2.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 config
    -rw-r--r-- 1 weare weare 10651 Jan  1 23:27 docker-compose.test.yaml
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 dynamic_dags
    -rw-r--r-- 1 weare weare   420 Jan  1 23:27 get_price_APPL.py
    -rw-r--r-- 1 weare weare   421 Jan  1 23:27 get_price_GOOG.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 plugins
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:27 trigger_dags

  6. 새로 받은 dags들(learn-airflow/dags) 모두 원래  dag위치( dags/ ) 로 이동
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ cp -r learn-airflow/dags/* dags/

  7. 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls -tl dags
    total 144
    drwxrwxr-x 2 weare root   4096 Jan  1 23:33 __pycache__
    -rw-r--r-- 1 weare weare  2311 Jan  1 23:33 Backup_Airflow_Data_to_S3.py
    -rw-r--r-- 1 weare weare  1968 Jan  1 23:33 Build_Summary.py
    -rw-r--r-- 1 weare weare   774 Jan  1 23:33 Build_Summary_v2.py
    -rw-r--r-- 1 weare weare  1419 Jan  1 23:33 Cleanup_Log.py
    -rw-r--r-- 1 weare weare  4694 Jan  1 23:33 Gsheet_to_Redshift.py
    -rw-r--r-- 1 weare root    803 Jan  1 23:33 HelloWorld.py
    -rw-r--r-- 1 weare root    465 Jan  1 23:33 HelloWorld_v2.py
    -rw-r--r-- 1 weare weare   882 Jan  1 23:33 Learn_BranchPythonOperator.py
    -rw-r--r-- 1 weare weare   820 Jan  1 23:33 Learn_Jinja.py
    -rw-r--r-- 1 weare weare   632 Jan  1 23:33 Learn_LatestOnlyOperator.py
    -rw-r--r-- 1 weare weare  1213 Jan  1 23:33 Learn_TaskGroups.py
    -rw-r--r-- 1 weare weare   652 Jan  1 23:33 Learn_TriggerRule.py
    -rw-r--r-- 1 weare weare  1500 Jan  1 23:33 MySQL_to_Redshift.py
    -rw-r--r-- 1 weare weare  1643 Jan  1 23:33 MySQL_to_Redshift_v2.py
    -rw-r--r-- 1 weare weare  2402 Jan  1 23:33 NameGenderCSVtoRedshift.py
    -rw-r--r-- 1 weare weare  3087 Jan  1 23:33 NameGenderCSVtoRedshift_v2.py
    -rw-r--r-- 1 weare weare  3190 Jan  1 23:33 NameGenderCSVtoRedshift_v3.py
    -rw-r--r-- 1 weare weare  3174 Jan  1 23:33 NameGenderCSVtoRedshift_v4.py
    -rw-r--r-- 1 weare weare  2347 Jan  1 23:33 NameGenderCSVtoRedshift_v5.py
    -rw-r--r-- 1 weare weare   851 Jan  1 23:33 SQL_to_Sheet.py
    -rw-r--r-- 1 weare weare  1827 Jan  1 23:33 UpdateSymbol.py
    -rw-r--r-- 1 weare weare  2349 Jan  1 23:33 UpdateSymbol_v2.py
    -rw-r--r-- 1 weare weare  2579 Jan  1 23:33 Weather_to_Redshift.py
    -rw-r--r-- 1 weare weare  3352 Jan  1 23:33 Weather_to_Redshift_v2.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 config
    -rw-r--r-- 1 weare weare 10651 Jan  1 23:33 docker-compose.test.yaml
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 dynamic_dags
    -rw-r--r-- 1 weare weare   420 Jan  1 23:33 get_price_APPL.py
    -rw-r--r-- 1 weare weare   421 Jan  1 23:33 get_price_GOOG.py
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 plugins
    drwxr-xr-x 2 weare weare  4096 Jan  1 23:33 trigger_dags
    -rw-r--r-- 1 weare root    717 Dec 29 21:40 TestDAG.py

  8. vim 편집기로 NameGenderCSVtoRedshift_v5 접속 후 개인 id로 수정
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim NameGenderCSVtoRedshift_v5.py
  9. 웹 ui(http://localhost:8080/)에 접속해보면 task 3개 모두 정상 실행된것을 확인할수 있다.

거의 수정없이 python 식으로 코딩

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host =  "learnde.(생략).redshift.amazonaws.com"
    user = " 본인 ID "  # 본인 ID 사용
    password = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()

def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)

def transform(text):
    logging.info("Transform started")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = " " 본인 ID " ,M" -> [ ' 본인 ID ', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records

def load(records):
    logging.info("load started")
    """
    records = [
      [ " 본인 ID ", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = " 본인 ID "
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")

def etl():
    link = "https://s3-geospatial.(생략).amazonaws.com/name_gender.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)

dag_second_assignment = DAG(
dag_id = 'name_gender',
catchup = False,
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *')  # 적당히 조절

task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)

결론 : 개인정보가 너무 노출된다. 링크 하드코딩 말고 params로 줘보자


params 활용 수정

  • 수정사항 : params 통해 변수넘기기, execution_date 얻어내기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = "learnde.(생략).redshift.amazonaws.com"
    redshift_user = " 본인 ID "  # 본인 ID 사용
    redshift_pass = "..."  # 본인 Password 사용
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()

def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)

def transform(text):
    logging.info("Transform started")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = " 본인 ID ,M" -> [ ' 본인 ID ', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records

def load(records):
    logging.info("load started")
    """
    records = [
      [ " 본인 ID ", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = " 본인 ID "
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")

def etl(**context):
    link = context["params"]["url"]
    # task 자체에 대한 정보를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
    # https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
    task_instance = context['task_instance']
    execution_date = context['execution_date']
    logging.info(execution_date)
    data = extract(link)
    lines = transform(data)
    load(lines)

dag = DAG(
    dag_id = 'name_gender_v2',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl,
    params = {
        'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    },
    dag = dag)

 

'airflow(에어플로우)' 카테고리의 다른 글

airflow dag(task decorator 활용) 실습 3  (0) 2024.01.02
airflow dag(xcom 활용) 실습 2  (0) 2024.01.02
airflow dag(hello world) 실습  (0) 2024.01.01
airflow 기초  (0) 2023.12.31
데이터 파이프라인  (1) 2023.12.30

1. PythonOperator 활용한 방법

  • PythonOperator : 이 Operator를 사용하면 Python 함수나 Python callable을 실행할 수 있다
  • 간단한 구조
    from airflow.operators.python
    import PythonOperator

    load_nps = PythonOperator(
       dag=dag,
       task_id='task_id',
       python_callable=python_func,

       # params로 인자들을 넘겨줌
       params={ 'table': 'delighted_nps', 'schema': 'raw_data' },
    )
    # cxt 통해 파라미터들을 받음
    def python_func(**cxt):
       table = cxt["params"]["table"]
       schema = cxt["params"]["schema"]
       ex_date = cxt["execution_date"]
    # Assign the tasks to the DAG in order
    print_hello() >> print_goodbye()

  • 실제 예
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime

    dag = DAG(
        dag_id = 'HelloWorld',
        start_date = datetime(2022,5,5),
        catchup=False,
        tags=['example'],
        schedule = '0 2 * * *')

    def print_hello():
        print("hello!")
        return "hello!"

    def print_goodbye():
        print("goodbye!")
        return "goodbye!"

    # PythonOperator는 그때 그때 선언해줘야한다
    print_hello = PythonOperator(
        #task 이름
        task_id = 'print_hello',
        #실행시키려는 함수이름
        python_callable = print_hello,
        #이 Task가 속한 DAG를 지정
        dag = dag)

    print_goodbye = PythonOperator(
        task_id = 'print_goodbye',
        python_callable = print_goodbye,
        dag = dag)

    # DAG 실행순서 할당
    # 순서를 정하지 않으면 각각 독립적으로 동시에 실행됨
    print_hello >> print_goodbye

2. Airflow Decorator 활용한 방법

  • Airflow Decorator : Airflow에서 사용되는 DAG를 정의하는 데코레이터(Decorator). 특정 기능이나 설정을 DAG 또는 DAG의 태스크에 적용하기 위한 방법으로 사용된다. Airflow Decorator는 자동으로 선언되기 때문에 PythonOperator에 비해 코드가 더 간단해진다.

  • 실제 예
    from airflow import DAG
    from airflow.decorators import task
    from datetime import datetime

    @task
    def print_hello():
        print("hello!")
        return "hello!"

    @task
    def print_goodbye():
        print("goodbye!")
        return "goodbye!"

    with DAG(
        dag_id = 'HelloWorld_v2',
        start_date = datetime(2022,5,5),
        catchup=False,
        tags=['example'],
        schedule = '0 2 * * *'
    ) as dag:

    # 함수이름이 기본으로 task id로 할당된다.
    print_hello() >> print_goodbye()

 

+ max_active_runs : 한번에 동시에 실행될수 있는 dag의 수. 일반적으로는 하나씩 해도 되지만, backfill을 할때는 여러개의 dag를 실행하며, 실행 시간을 줄일수 있다. 
max_active_tasks : 한번에 동시에  실행될수 있는 dag의 task 수. 리소스 사용을 조절하거나 부하를 관리하는 데 도움이됨
+airflow worker에 할당된 cpu의 총합= max_active의 한계

'airflow(에어플로우)' 카테고리의 다른 글

airflow dag(task decorator 활용) 실습 3  (0) 2024.01.02
airflow dag(xcom 활용) 실습 2  (0) 2024.01.02
airflow dag(params 활용) 실습 1  (0) 2024.01.02
airflow 기초  (0) 2023.12.31
데이터 파이프라인  (1) 2023.12.30

+ Recent posts