airflow(에어플로우)
airflow dag(주식정보 수집) 실습1 -full refresh 방식
데이터왕
2024. 1. 2. 01:09
목표
- yahoo finance API를 호출해서 애플 주식 정보 수집( 지난 30일)
- redshift 상의 테이블로 1에서 받은 레코드들을 적재
- full refresh로 구현
- 트랜잭션 형태를 구현
코드
from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp import yfinance as yf import pandas as pd import logging def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor() @task def get_historical_prices(symbol): ticket = yf.Ticker(symbol) data = ticket.history() records = [] for index, row in data.iterrows(): date = index.strftime('%Y-%m-%d %H:%M:%S') records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]]) return records @task def load(schema, table, records): logging.info("load started") cur = get_Redshift_connection() try: cur.execute("BEGIN;") cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};") cur.execute(f""" CREATE TABLE {schema}.{table} ( date date, "open" float, high float, low float, close float, volume bigint );""") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});" print(sql) cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except Exception as error: print(error) cur.execute("ROLLBACK;") raise logging.info("load done") with DAG( dag_id = 'UpdateSymbol', start_date = datetime(2023,5,30), catchup=False, tags=['API'], schedule = '0 10 * * *' ) as dag: results = get_historical_prices("AAPL") load("wearealego", "stock_info", results) |
실습
- 컨테이너 부팅시마다 모듈 설치를 위해 yaml 파일 수정
weare@DESKTOP-BE1I4GE:~/airflow-setup$ ls
README.md dags docker-compose.yaml docs learn-airflow logs plugins - vim 편집기로 yaml 파일 열기
weare@DESKTOP-BE1I4GE:~/airflow-setup$ vim docker-compose.yaml - 내용 수정(해당 빨간선 내용 추가) : 실습에 필요한 yfinanace 포함 기초적으로 필요한 모듈들.
- dags 폴더로 이동
weare@DESKTOP-BE1I4GE:~/airflow-setup$ cd dags - 내용 바꾸려는 dag인 UpdateSymbol.py 찾음
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ ls
Backup_Airflow_Data_to_S3.py HelloWorld.py Learn_TaskGroups.py NameGenderCSVtoRedshift_v2.py TestDAG.py __pycache__ get_price_GOOG.py
Build_Summary.py HelloWorld_v2.py Learn_TriggerRule.py NameGenderCSVtoRedshift_v3.py UpdateSymbol.py config plugins
Build_Summary_v2.py Learn_BranchPythonOperator.py MySQL_to_Redshift.py NameGenderCSVtoRedshift_v4.py UpdateSymbol_v2.py docker-compose.test.yaml trigger_dags
Cleanup_Log.py Learn_Jinja.py MySQL_to_Redshift_v2.py NameGenderCSVtoRedshift_v5.py Weather_to_Redshift.py dynamic_dags
Gsheet_to_Redshift.py Learn_LatestOnlyOperator.py NameGenderCSVtoRedshift.py SQL_to_Sheet.py Weather_to_Redshift_v2.py get_price_APPL.py - vim 편집기로 UpdateSymbol.py 진입 후 본인 아이디로 수정 후 저장
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol.py - 커맨드 라인에서 test 해보자
1) 도커 스케줄러 주소 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
13c7de52342d apache/airflow:2.5.1 "/usr/bin/dumb-init …" 12 minutes ago Up 12 minutes (healthy) 8080/tcp airflow-setup_airflow-triggerer_1
8e40ffd79576 apache/airflow:2.5.1 "/usr/bin/dumb-init …" 12 minutes ago Up 12 minutes (healthy) 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp airflow-setup_airflow-webserver_1
c57811bc9ba1 apache/airflow:2.5.1 "/usr/bin/dumb-init …" 12 minutes ago Up 12 minutes (healthy) 8080/tcp airflow-setup_airflow-worker_1
f14cc34e94be apache/airflow:2.5.1 "/usr/bin/dumb-init …" 12 minutes ago Up 12 minutes (healthy) 8080/tcp airflow-setup_airflow-scheduler_1
48ee152ae746 redis:latest "docker-entrypoint.s…" 3 days ago Up 12 minutes (healthy) 6379/tcp airflow-setup_redis_1
a5e756836619 postgres:13 "docker-entrypoint.s…" 3 days ago Up 12 minutes (healthy) 5432/tcp airflow-setup_postgres_1
2) 스케줄러 shell 접속
weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker exec -it f14cc34e94be sh
3) UpdateSymbol의 dag 리스트 확인
(airflow)airflow tasks list UpdateSymbol
/home/airflow/.local/lib/python3.7/site-packages/airflow/models/base.py:49 MovedIn20Warning: [31mDeprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. [32mTo prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". [36mSet environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message.[0m (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')
get_historical_prices
load
4) UpdateSymbol dag 테스트
(airflow)airflow dags test UpdateSymbol 2023-05-30
(중략)
[2024-01-01 16:02:30,217] {dagrun.py:673} INFO - DagRun Finished: dag_id=UpdateSymbol, execution_date=2023-05-30T00:00:00+00:00, run_id=manual__2023-05-30T00:00:00+00:00, run_start_date=2023-05-30T00:00:00+00:00, run_end_date=2024-01-01 16:02:30.216874+00:00, run_duration=18720150.216874, state=success, external_trigger=False, run_type=manual, data_interval_start=2023-05-30T00:00:00+00:00, data_interval_end=2023-05-30T10:00:00+00:00, dag_hash=None
정상실행 완료!