disinct 방식


disinct 방식의 허점

  • 모든 데이터가 동일하면 새 데이터로 대체하는 방식인데, 주식데이터는 마감시간이 아니면 종가,거래량은 달라진다.
  • row_number 방식으로 동일 레코드를 처리해야한다.

코드

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records


def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint,
    created_date timestamp DEFAULT GETDATE()
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        create_t_sql = f"""CREATE TEMP TABLE t (LIKE {schema}.{table} INCLUDING DEFAULTS);
            INSERT INTO t SELECT * FROM {schema}.{table};"""
        cur.execute(create_t_sql)
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"DELETE FROM {schema}.{table};")
        cur.execute(f"""INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
    FROM t
)
WHERE seq = 1;""")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol_v3',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("본인id", "stock_info_v3", results)

 

실습파일 다운 후 수정

  • 실습파일 다운
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ gdown --id 18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY -O UpdateSymbol_v3.py
    /home/weare/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: Option `--id` was deprecated in version 4.3.1 and will be removed in 5.0. You don't need to pass it anymore to use a file ID.
      warnings.warn(
    Downloading...
    From (uriginal): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY
    From (redirected): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY&confirm=t&uuid=963be733-82a4-418f-83d2-2530c4d638ab
    To: /home/weare/airflow-setup/dags/UpdateSymbol_v3.py
    100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2.62k/2.62k [00:00<00:00, 17.3MB/s]

  • 실습파일 수정(본인의 id로 대체)
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol_v3.py

 

정상실행 완료

 

+ Recent posts