# 필요한 라이브러리 및 모듈을 가져옵니다 from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp
import yfinance as yf import pandas as pd import logging
# Redshift 연결을 가져오는 함수 def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor()
# 특정 주식 심볼에 대한 역사적 주가를 가져오는 태스크 @task def get_historical_prices(symbol): # yfinance 라이브러리를 사용하여 주식의 역사적 가격을 가져옵니다 ticket = yf.Ticker(symbol) data = ticket.history() records = []
# 데이터를 Redshift로 로딩하기 위해 변환 및 구조화 for index, row in data.iterrows(): date = index.strftime('%Y-%m-%d %H:%M:%S') records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
# Redshift에서 테이블을 만들거나 삭제하는 함수 def _create_table(cur, schema, table, drop_first): if drop_first: cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};") cur.execute(f""" CREATE TABLE IF NOT EXISTS {schema}.{table} ( date date, "open" float, high float, low float, close float, volume bigint );""")
# 역사적 주식 가격을 Redshift로 로딩하는 태스크 @task def load(schema, table, records): logging.info("로딩 시작") cur = get_Redshift_connection() try: cur.execute("BEGIN;")
# 처음실행된 경우 예외 : 원본 테이블이 없으면 생성 , drop_first= False _create_table(cur, schema, table, False)
# 1. 임시 테이블(tem)을 임시 생성하면서 원본 테이블의 레코드를 복사 cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};") # 2. 임시 테이블로 Yahoo finance API로 읽어온 레코드를 적재 for r in records: sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});" print(sql) cur.execute(sql)
# 3. 원본 테이블 삭제하고 새로 생성 , drop_first= True _create_table(cur, schema, table, True)
# 4. 원본 테이블에 임시 테이블 내용을 복사 cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;") cur.execute("COMMIT;") # cur.execute("END;") except Exception as error: print(error) cur.execute("ROLLBACK;") raise logging.info("로딩 완료")
# DAG를 정의합니다 with DAG( dag_id='UpdateSymbol_v2', start_date=datetime(2023, 5, 30), catchup=False, tags=['API'], schedule='0 10 * * *' # DAG를 매일 오전 10:00에 실행 ) as dag:
# 주식 심볼 "AAPL"에 대한 역사적 가격을 가져오는 태스크 results = get_historical_prices("AAPL") # 가져온 역사적 가격을 Redshift 테이블 "stock_info_v2"로 로딩하는 태스크 load("본인스키마", "stock_info_v2", results) |