목표

  • open weathermap api (위도 경도 기반 기후정보) 를 이용해 제작
  • 서울 8일 낮/최소/최대 온도 읽기
  • full refresh & insert into로 구현

open weather api 응답

  • day,min,max, night, eve, morn


실습

  1. api 키를 넣는 변수 설정
  2. vim 편집기로 Weather_to_Redshift.py 진입 
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim Weather_to_Redshift.py

  3. 본인 스키마에 맞게 내용 수정후 저장
  4. 코드
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json

def get_Redshift_connection():
    # Redshift에 연결하고 커서를 반환합니다.
    # 기본적으로 autocommit은 False입니다.
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

@task
def etl(schema, table):
    # OpenWeatherMap API에서 Redshift 테이블로 ETL(Extract, Transform, Load)을 수행하는 함수

    # Airflow 변수에서 OpenWeatherMap API 키를 가져옵니다.
    api_key = Variable.get("open_weather_api_key")
    
    # 위치에 대한 위도 및 경도를 정의합니다. (이 경우 서울)
    lat = 37.5665
    lon = 126.9780

    # one-call API를 위한 API URL을 구성합니다.
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)

    # API 응답에서 관련 정보를 추출합니다.
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    # Redshift 연결 커서를 가져옵니다.
    cur = get_Redshift_connection()

    # 테이블 삭제 및 재생성을 위한 SQL 문
    drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);
"""
    # 데이터를 테이블에 삽입하기 위한 SQL 문
    insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)

    # SQL 문을 로그에 기록합니다.
    logging.info(drop_recreate_sql)
    logging.info(insert_sql)

    # 예외가 발생할 경우를 대비해 try-except 블록 내에서 SQL 문을 실행합니다.
    try:
        cur.execute(drop_recreate_sql)
        cur.execute(insert_sql)
        cur.execute("Commit;")
    except Exception as e:
        # 예외 발생 시 트랜잭션을 롤백합니다.
        cur.execute("Rollback;")
        raise

with DAG(
    dag_id = 'Weather_to_Redshift',
    start_date = datetime(2023,5,30),
    schedule = '0 2 * * *',
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

    # 다음 작업의 목적을 설명하는 주석을 추가합니다.
    etl("본인스키마", "weather_forecast")

 

3. 실행 확인

+ Recent posts