disinct 방식


disinct 방식의 허점

  • 모든 데이터가 동일하면 새 데이터로 대체하는 방식인데, 주식데이터는 마감시간이 아니면 종가,거래량은 달라진다.
  • row_number 방식으로 동일 레코드를 처리해야한다.

코드

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records


def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint,
    created_date timestamp DEFAULT GETDATE()
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        create_t_sql = f"""CREATE TEMP TABLE t (LIKE {schema}.{table} INCLUDING DEFAULTS);
            INSERT INTO t SELECT * FROM {schema}.{table};"""
        cur.execute(create_t_sql)
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"DELETE FROM {schema}.{table};")
        cur.execute(f"""INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
    FROM t
)
WHERE seq = 1;""")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol_v3',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("본인id", "stock_info_v3", results)

 

실습파일 다운 후 수정

  • 실습파일 다운
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ gdown --id 18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY -O UpdateSymbol_v3.py
    /home/weare/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: Option `--id` was deprecated in version 4.3.1 and will be removed in 5.0. You don't need to pass it anymore to use a file ID.
      warnings.warn(
    Downloading...
    From (uriginal): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY
    From (redirected): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY&confirm=t&uuid=963be733-82a4-418f-83d2-2530c4d638ab
    To: /home/weare/airflow-setup/dags/UpdateSymbol_v3.py
    100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2.62k/2.62k [00:00<00:00, 17.3MB/s]

  • 실습파일 수정(본인의 id로 대체)
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol_v3.py

 

정상실행 완료

 

목표

  • yahoo finance API를 호출해서 애플 주식 정보 수집( 지난 30일)
  • redshift 상의 테이블로 1에서 받은 레코드들을 적재
  • full refresh로 구현
  • 트랜잭션 형태를 구현

코드 

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')
        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("wearealego", "stock_info", results)

 

실습

  1. 컨테이너 부팅시마다 모듈 설치를 위해 yaml 파일 수정 
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls
    README.md  dags  docker-compose.yaml  docs  learn-airflow  logs  plugins

  2. vim 편집기로 yaml 파일 열기
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ vim docker-compose.yaml

  3. 내용 수정(해당 빨간선 내용 추가) : 실습에 필요한 yfinanace 포함 기초적으로 필요한 모듈들.
  4. dags 폴더로 이동
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ cd dags

  5. 내용 바꾸려는 dag인 UpdateSymbol.py 찾음
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ ls
    Backup_Airflow_Data_to_S3.py  HelloWorld.py                  Learn_TaskGroups.py         NameGenderCSVtoRedshift_v2.py  TestDAG.py                 __pycache__               get_price_GOOG.py
    Build_Summary.py              HelloWorld_v2.py               Learn_TriggerRule.py        NameGenderCSVtoRedshift_v3.py  UpdateSymbol.py            config                    plugins
    Build_Summary_v2.py           Learn_BranchPythonOperator.py  MySQL_to_Redshift.py        NameGenderCSVtoRedshift_v4.py  UpdateSymbol_v2.py         docker-compose.test.yaml  trigger_dags
    Cleanup_Log.py                Learn_Jinja.py                 MySQL_to_Redshift_v2.py     NameGenderCSVtoRedshift_v5.py  Weather_to_Redshift.py     dynamic_dags
    Gsheet_to_Redshift.py         Learn_LatestOnlyOperator.py    NameGenderCSVtoRedshift.py  SQL_to_Sheet.py                Weather_to_Redshift_v2.py  get_price_APPL.py

  6. vim 편집기로 UpdateSymbol.py 진입 후 본인 아이디로 수정 후 저장 
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol.py
  7. 커맨드 라인에서 test 해보자
    1) 도커 스케줄러 주소 확인
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker ps
    CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                    PORTS                                       NAMES
    13c7de52342d   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-triggerer_1
    8e40ffd79576   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   airflow-setup_airflow-webserver_1
    c57811bc9ba1   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-worker_1
    f14cc34e94be   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   12 minutes ago   Up 12 minutes (healthy)   8080/tcp                                    airflow-setup_airflow-scheduler_1
    48ee152ae746   redis:latest           "docker-entrypoint.s…"   3 days ago       Up 12 minutes (healthy)   6379/tcp                                    airflow-setup_redis_1
    a5e756836619   postgres:13            "docker-entrypoint.s…"   3 days ago       Up 12 minutes (healthy)   5432/tcp                                    airflow-setup_postgres_1

    2) 스케줄러 shell 접속
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker exec -it f14cc34e94be sh

    3) UpdateSymbol의 dag 리스트 확인
    (airflow)airflow tasks list UpdateSymbol
    /home/airflow/.local/lib/python3.7/site-packages/airflow/models/base.py:49 MovedIn20Warning: [31mDeprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. [32mTo prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". [36mSet environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message.[0m (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
    SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')
    get_historical_prices
    load

    4) UpdateSymbol  dag 테스트 
    (airflow)airflow dags test UpdateSymbol 2023-05-30
    (중략)
    [2024-01-01 16:02:30,217] {dagrun.py:673} INFO - DagRun Finished: dag_id=UpdateSymbol, execution_date=2023-05-30T00:00:00+00:00, run_id=manual__2023-05-30T00:00:00+00:00, run_start_date=2023-05-30T00:00:00+00:00, run_end_date=2024-01-01 16:02:30.216874+00:00, run_duration=18720150.216874, state=success, external_trigger=False, run_type=manual, data_interval_start=2023-05-30T00:00:00+00:00, data_interval_end=2023-05-30T10:00:00+00:00, dag_hash=None

정상실행 완료!

+ Recent posts