backfill을 할수 있는 조건

  • 모든 dag가 backfill을 필요로 하지 않는다.(full refresh는 필요없음)
  • 일별 시간별 업데이트 할때만 backfill이 필요하다.
  • 데이터 소스에 그날 바뀌거나 새로생긴 레코드를 구분할수 있는 기능이 있어야함
  • 데이터 크기가 커지면 backfill 기능을 필수적으로 구현


daily incremental dag에서 2018년 7월달 데이터를 다시 읽어와야 한다면

  1. 하루씩 지금까지 실행?
  2. 한번에 여러 날짜를 동시에 실행 , max_active_runs
  3. 예상되는 문제
    1) 데이터를 요구받는 소스측에서 감당못해 속도를 줄일수 있음 
    2) 동시 실행시 충돌이 날수도 있음
  4. backfill 실행전 준비사항
    1) catchup=True
    2) execution_date 사용해서 incremental update가 구현돼있음.
  5. 실행순서
    1) 실행순서는 날짜/시간순 아니고 랜덤.
    2) 날짜순으로 하고 싶으면 
  6. 커맨드라인에서 실행
    airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01

목표

  • yahoo finance API를 호출해서 애플 주식 정보 수집( 지난 30일)
  • redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거
  • incremental 업데이트 방식으로 매일 하루치의 데이터가 늘어나야함
  • 트랜잭션 형태 구성

간단 프로세스

  1. 임시 테이블(tem)을 임시 생성하면서 원본 테이블의 레코드를 복사(CREATE TEMP TABLE ...AS SELECT)
  2. 임시 테이블로 Yahoo finance API로 읽어온 레코드를 적재
  3. 원본 테이블 삭제하고 새로 생성
  4. 원본 테이블에 임시 테이블 내용을 복사( select distinct*를 사용해서 중복제거)

코드

# 필요한 라이브러리 및 모듈을 가져옵니다
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging

# Redshift 연결을 가져오는 함수
def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

# 특정 주식 심볼에 대한 역사적 주가를 가져오는 태스크
@task
def get_historical_prices(symbol):
    # yfinance 라이브러리를 사용하여 주식의 역사적 가격을 가져옵니다
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    # 데이터를 Redshift로 로딩하기 위해 변환 및 구조화
    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')
        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

# Redshift에서 테이블을 만들거나 삭제하는 함수
def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")

# 역사적 주식 가격을 Redshift로 로딩하는 태스크
@task
def load(schema, table, records):
    logging.info("로딩 시작")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")

        # 처음실행된 경우 예외 : 원본 테이블이 없으면 생성 , drop_first= False
        _create_table(cur, schema, table, False)

        # 1. 임시 테이블(tem)을 임시 생성하면서 원본 테이블의 레코드를 복사
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        # 2. 임시 테이블로 Yahoo finance API로 읽어온 레코드를 적재
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 3. 원본 테이블 삭제하고 새로 생성  , drop_first= True
        _create_table(cur, schema, table, True)

        # 4. 원본 테이블에 임시 테이블 내용을 복사
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("로딩 완료")

# DAG를 정의합니다
with DAG(
    dag_id='UpdateSymbol_v2',
    start_date=datetime(2023, 5, 30),
    catchup=False,
    tags=['API'],
    schedule='0 10 * * *'  # DAG를 매일 오전 10:00에 실행
) as dag:

    # 주식 심볼 "AAPL"에 대한 역사적 가격을 가져오는 태스크
    results = get_historical_prices("AAPL")
    # 가져온 역사적 가격을 Redshift 테이블 "stock_info_v2"로 로딩하는 태스크
    load("본인스키마", "stock_info_v2", results)

 

실행

  • dags 폴더에 진입
    weare@DESKTOP-BE1I4GE:~/airflow-setup$ cd dags

  • 수정할 dag 파일 찾음
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ ls
    Backup_Airflow_Data_to_S3.py  HelloWorld.py                  Learn_TaskGroups.py         NameGenderCSVtoRedshift_v2.py  TestDAG.py                 __pycache__               get_price_GOOG.py
    Build_Summary.py              HelloWorld_v2.py               Learn_TriggerRule.py        NameGenderCSVtoRedshift_v3.py  UpdateSymbol.py            config                    plugins
    Build_Summary_v2.py           Learn_BranchPythonOperator.py  MySQL_to_Redshift.py        NameGenderCSVtoRedshift_v4.py  UpdateSymbol_v2.py         docker-compose.test.yaml  trigger_dags
    Cleanup_Log.py                Learn_Jinja.py                 MySQL_to_Redshift_v2.py     NameGenderCSVtoRedshift_v5.py  Weather_to_Redshift.py     dynamic_dags
    Gsheet_to_Redshift.py         Learn_LatestOnlyOperator.py    NameGenderCSVtoRedshift.py  SQL_to_Sheet.py                Weather_to_Redshift_v2.py  get_price_APPL.py

  • vim 편집기로 UpdateSymbol_v2.py 파일에 스키마, 레드쉬프트의 본인 스키마로 교체
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol_v2.py
  • 웹 ui(http://localhost:8080/) 에서 정상 실행 확인

+ Recent posts