disinct 방식
disinct 방식의 허점
- 모든 데이터가 동일하면 새 데이터로 대체하는 방식인데, 주식데이터는 마감시간이 아니면 종가,거래량은 달라진다.
- row_number 방식으로 동일 레코드를 처리해야한다.
코드
from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp import yfinance as yf import pandas as pd import logging def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor() @task def get_historical_prices(symbol): ticket = yf.Ticker(symbol) data = ticket.history() records = [] for index, row in data.iterrows(): date = index.strftime('%Y-%m-%d %H:%M:%S') records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]]) return records def _create_table(cur, schema, table, drop_first): if drop_first: cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};") cur.execute(f""" CREATE TABLE IF NOT EXISTS {schema}.{table} ( date date, "open" float, high float, low float, close float, volume bigint, created_date timestamp DEFAULT GETDATE() );""") @task def load(schema, table, records): logging.info("load started") cur = get_Redshift_connection() try: cur.execute("BEGIN;") # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드 _create_table(cur, schema, table, False) # 임시 테이블로 원본 테이블을 복사 create_t_sql = f"""CREATE TEMP TABLE t (LIKE {schema}.{table} INCLUDING DEFAULTS); INSERT INTO t SELECT * FROM {schema}.{table};""" cur.execute(create_t_sql) for r in records: sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});" print(sql) cur.execute(sql) # 임시 테이블 내용을 원본 테이블로 복사 cur.execute(f"DELETE FROM {schema}.{table};") cur.execute(f"""INSERT INTO {schema}.{table} SELECT date, "open", high, low, close, volume FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM t ) WHERE seq = 1;""") cur.execute("COMMIT;") # cur.execute("END;") except Exception as error: print(error) cur.execute("ROLLBACK;") raise logging.info("load done") with DAG( dag_id = 'UpdateSymbol_v3', start_date = datetime(2023,5,30), catchup=False, tags=['API'], schedule = '0 10 * * *' ) as dag: results = get_historical_prices("AAPL") load("본인id", "stock_info_v3", results) |
실습파일 다운 후 수정
- 실습파일 다운
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ gdown --id 18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY -O UpdateSymbol_v3.py
/home/weare/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: Option `--id` was deprecated in version 4.3.1 and will be removed in 5.0. You don't need to pass it anymore to use a file ID.
warnings.warn(
Downloading...
From (uriginal): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY
From (redirected): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY&confirm=t&uuid=963be733-82a4-418f-83d2-2530c4d638ab
To: /home/weare/airflow-setup/dags/UpdateSymbol_v3.py
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2.62k/2.62k [00:00<00:00, 17.3MB/s] - 실습파일 수정(본인의 id로 대체)
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol_v3.py
'airflow(에어플로우)' 카테고리의 다른 글
[airflow 실습] mysql 테이블 redshift로 복사하기 (0) | 2024.01.03 |
---|---|
No module named MySQLdb 에러 (0) | 2024.01.03 |
backfill과 airflow (0) | 2024.01.02 |
데이터 웨어하우스에서 primary key (0) | 2024.01.02 |
airflow dag(기후정보) 실습 (0) | 2024.01.02 |