1. 컨테이너 종류 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED        STATUS                  PORTS                                       NAMES
13c7de52342d   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   8080/tcp                                    airflow-setup_airflow-triggerer_1
8e40ffd79576   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   airflow-setup_airflow-webserver_1
c57811bc9ba1   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   8080/tcp                                    airflow-setup_airflow-worker_1
f14cc34e94be   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   8080/tcp                                    airflow-setup_airflow-scheduler_1
48ee152ae746   redis:latest           "docker-entrypoint.s…"   4 days ago     Up 19 hours (healthy)   6379/tcp                                    airflow-setup_redis_1
a5e756836619   postgres:13            "docker-entrypoint.s…"   4 days ago     Up 19 hours (healthy)   5432/tcp                                    airflow-setup_postgres_1

2. 스케줄러 shell root 권한으로 접속
weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker exec --user root -it f14cc34e94be sh
Get:1 http://deb.debian.org/debian bullseye InRelease [116 kB]
Get:2 http://deb.debian.org/debian-security bullseye-security InRelease [48.4 kB]
Get:3 http://deb.debian.org/debian bullseye-updates InRelease [44.1 kB]
Get:4 http://deb.debian.org/debian bullseye/main amd64 Packages [8062 kB]
Get:5 https://packages.microsoft.com/debian/11/prod bullseye InRelease [3649 B]
Get:6 https://packages.microsoft.com/debian/11/prod bullseye/main amd64 Packages [131 kB]
Get:7 http://deb.debian.org/debian-security bullseye-security/main amd64 Packages [264 kB]
Get:8 http://deb.debian.org/debian bullseye-updates/main amd64 Packages [18.8 kB]
Get:9 https://packages.microsoft.com/debian/11/prod bullseye/main arm64 Packages [27.5 kB]
Get:10 https://packages.microsoft.com/debian/11/prod bullseye/main armhf Packages [24.9 kB]
Get:11 https://packages.microsoft.com/debian/11/prod bullseye/main all Packages [1214 B]
Get:12 https://apt.postgresql.org/pub/repos/apt bullseye-pgdg InRelease [123 kB]
Get:13 https://apt.postgresql.org/pub/repos/apt bullseye-pgdg/main amd64 Packages [306 kB]
Fetched 9170 kB in 5s (1760 kB/s)
Reading package lists... Done


3. MySQL을 사용하는 Apache Airflow에서 필요한 패키지 및 라이브러리를 설치.

 

  • apt-get은 리눅스 시스템에서 패키지 관리를 위해 사용되며, update명령어는 패키지 목록을 최신으로 갱신
    (airflow)sudo apt-get update

  • MySQL 데이터베이스와 상호 작용하는 C 언어 클라이언트 라이브러리를 개발하기 위해 필요한 헤더 파일 및 라이브러리를 포함하는 패키지인 default-libmysqlclient-dev를 설치
    (airflow)sudo apt-get install -y default-libmysqlclient-dev
    Reading package lists... Done
    Building dependency tree... Done
    Reading state information... Done
    E: Unable to locate package defauld-libmysqlclient-dev

  • gcc는 GNU Compiler Collection의 일부로, C 및 C++ 등의 프로그래밍 언어를 컴파일하는 데 사용되는 컴파일러 설치
    (airflow)sudo apt-get install -y gcc
    Reading package lists... Done
    Building dependency tree... Done
    Reading state information... Done
    gcc is already the newest version (4:10.2.1-1).
    0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.

  • Apache Airflow에서 MySQL을 사용할 수 있게 하는 공식 제공자 패키지 apache-airflow-providers-mysql를 설치. pip3는 파이썬 패키지 설치 도구이며, --ignore-installed 플래그는 이미 설치된 패키지를 무시하고 강제로 새로운 버전 설치
    (airflow)sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
    Collecting apache-airflow-providers-mysql
      Downloading apache_airflow_providers_mysql-5.1.0-py3-none-any.whl.metadata (16 kB)
    Collecting apache-airflow-providers-common-sql>=1.3.1 (from apache-airflow-providers-mysql)
      Downloading apache_airflow_providers_common_sql-1.5.1-py3-none-any.whl.metadata (10 kB)
    Collecting apache-airflow>=2.4.0 (from apache-airflow-providers-mysql)

 

disinct 방식


disinct 방식의 허점

  • 모든 데이터가 동일하면 새 데이터로 대체하는 방식인데, 주식데이터는 마감시간이 아니면 종가,거래량은 달라진다.
  • row_number 방식으로 동일 레코드를 처리해야한다.

코드

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records


def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint,
    created_date timestamp DEFAULT GETDATE()
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        create_t_sql = f"""CREATE TEMP TABLE t (LIKE {schema}.{table} INCLUDING DEFAULTS);
            INSERT INTO t SELECT * FROM {schema}.{table};"""
        cur.execute(create_t_sql)
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"DELETE FROM {schema}.{table};")
        cur.execute(f"""INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
    FROM t
)
WHERE seq = 1;""")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol_v3',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("본인id", "stock_info_v3", results)

 

실습파일 다운 후 수정

  • 실습파일 다운
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ gdown --id 18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY -O UpdateSymbol_v3.py
    /home/weare/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: Option `--id` was deprecated in version 4.3.1 and will be removed in 5.0. You don't need to pass it anymore to use a file ID.
      warnings.warn(
    Downloading...
    From (uriginal): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY
    From (redirected): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY&confirm=t&uuid=963be733-82a4-418f-83d2-2530c4d638ab
    To: /home/weare/airflow-setup/dags/UpdateSymbol_v3.py
    100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2.62k/2.62k [00:00<00:00, 17.3MB/s]

  • 실습파일 수정(본인의 id로 대체)
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol_v3.py

 

정상실행 완료

 

incremental update가 실패하면?

  • full refresh를 할때는 새로 지우고 새로만들어서 실패하지 않는다.(backfill 할 필요가없음)
  • 따라서 데이터가 너무 크지 않는 이상은 full refresh를 하는게 좋다.
  • incremental update 시는 backfill의 문제가 생긴다. 그래서 운영/유지보수의 난이도가 올라간다.


backfill

  • backfill의 정의 : 실패한 데이터 파이프라인을 재실행 하는경우 or 읽어온 데이터가 문제있어 다시 읽어와야하는 경우
  • 즉 재실행이 얼마나 용이한 구조인지가 중요한데, 그게 잘 디자인된 툴이 airflow이다.

  • 방법 1 backfill 단순하게 하면 어떻게 될까?

    1)  dag가 이렇게 있다고 가정
    from datetime import datetime, timedelta
    # 지금 시간 기준으로 어제 날짜를 계산
    y = datetime.now() - timedelta(1)
    yesterday = datetime.strftime(y, '%Y-%m-%d')

    # yesterday에 해당하는 데이터를 소스에서 읽어옴
    # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
    sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

    2) 기존 변수를 지우고, 날짜를 특정날짜( '2023-01-01') 로 하드코딩
    from datetime import datetime, timedelta
    yesterday = '2023-01-01'
    # yesterday에 해당하는 데이터를 소스에서 읽어옴
    # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
    sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

    3) 문제점 : 코드를 다시 고쳐줘야하고, 날짜 잘못입력가능, 사람이 실수를 할수있는 포인트가 많음

  • 방법 2 airflow를 이용
    1) dag 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
    2) 모든 dag 실행에는 "execution_date"이 지정돼있음
    + execution_date으로 채워야하는 날짜와 시간이 넘어옴
    + execution_date는 실패한 날짜의 데이터 수집 시작 날짜
    3) 이를 바탕으로 데이터를 갱신하게 코드를 작성한다.
    4) backfill이 쉬워진다.

 

start_date와 execution_date

  • start_date는 (dag)시작되는 날짜임 =데이터수집 시작날짜
  • execution_date는 (Task Instance) 시작되는 날짜

+ Recent posts