목표
- open weathermap api (위도 경도 기반 기후정보) 를 이용해 제작
- 서울 8일 낮/최소/최대 온도 읽기
- full refresh & insert into로 구현
open weather api 응답
- day,min,max, night, eve, morn
실습
- api 키를 넣는 변수 설정
- vim 편집기로 Weather_to_Redshift.py 진입
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim Weather_to_Redshift.py - 본인 스키마에 맞게 내용 수정후 저장
- 코드
from airflow import DAG from airflow.models import Variable from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.decorators import task from datetime import datetime from datetime import timedelta import requests import logging import json def get_Redshift_connection(): # Redshift에 연결하고 커서를 반환합니다. # 기본적으로 autocommit은 False입니다. hook = PostgresHook(postgres_conn_id='redshift_dev_db') return hook.get_conn().cursor() @task def etl(schema, table): # OpenWeatherMap API에서 Redshift 테이블로 ETL(Extract, Transform, Load)을 수행하는 함수 # Airflow 변수에서 OpenWeatherMap API 키를 가져옵니다. api_key = Variable.get("open_weather_api_key") # 위치에 대한 위도 및 경도를 정의합니다. (이 경우 서울) lat = 37.5665 lon = 126.9780 # one-call API를 위한 API URL을 구성합니다. url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts" response = requests.get(url) data = json.loads(response.text) # API 응답에서 관련 정보를 추출합니다. ret = [] for d in data["daily"]: day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"])) # Redshift 연결 커서를 가져옵니다. cur = get_Redshift_connection() # 테이블 삭제 및 재생성을 위한 SQL 문 drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table}; CREATE TABLE {schema}.{table} ( date date, temp float, min_temp float, max_temp float, created_date timestamp default GETDATE() ); """ # 데이터를 테이블에 삽입하기 위한 SQL 문 insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret) # SQL 문을 로그에 기록합니다. logging.info(drop_recreate_sql) logging.info(insert_sql) # 예외가 발생할 경우를 대비해 try-except 블록 내에서 SQL 문을 실행합니다. try: cur.execute(drop_recreate_sql) cur.execute(insert_sql) cur.execute("Commit;") except Exception as e: # 예외 발생 시 트랜잭션을 롤백합니다. cur.execute("Rollback;") raise with DAG( dag_id = 'Weather_to_Redshift', start_date = datetime(2023,5,30), schedule = '0 2 * * *', max_active_runs = 1, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) as dag: # 다음 작업의 목적을 설명하는 주석을 추가합니다. etl("본인스키마", "weather_forecast") |
3. 실행 확인
'airflow(에어플로우)' 카테고리의 다른 글
backfill과 airflow (0) | 2024.01.02 |
---|---|
데이터 웨어하우스에서 primary key (0) | 2024.01.02 |
airflow dag(나라정보) 실습 (0) | 2024.01.02 |
airflow.cfg 파일 (0) | 2024.01.02 |
airflow dag(주식정보 수집) 실습2-incremental update (0) | 2024.01.02 |