airflow(에어플로우)
backfill과 airflow
데이터왕
2024. 1. 2. 23:49
incremental update가 실패하면?
- full refresh를 할때는 새로 지우고 새로만들어서 실패하지 않는다.(backfill 할 필요가없음)
- 따라서 데이터가 너무 크지 않는 이상은 full refresh를 하는게 좋다.
- incremental update 시는 backfill의 문제가 생긴다. 그래서 운영/유지보수의 난이도가 올라간다.
backfill
- backfill의 정의 : 실패한 데이터 파이프라인을 재실행 하는경우 or 읽어온 데이터가 문제있어 다시 읽어와야하는 경우
- 즉 재실행이 얼마나 용이한 구조인지가 중요한데, 그게 잘 디자인된 툴이 airflow이다.
- 방법 1 backfill 단순하게 하면 어떻게 될까?
1) dag가 이렇게 있다고 가정
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
2) 기존 변수를 지우고, 날짜를 특정날짜( '2023-01-01') 로 하드코딩
from datetime import datetime, timedelta
yesterday = '2023-01-01'
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
3) 문제점 : 코드를 다시 고쳐줘야하고, 날짜 잘못입력가능, 사람이 실수를 할수있는 포인트가 많음 - 방법 2 airflow를 이용
1) dag 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
2) 모든 dag 실행에는 "execution_date"이 지정돼있음
+ execution_date으로 채워야하는 날짜와 시간이 넘어옴
+ execution_date는 실패한 날짜의 데이터 수집 시작 날짜
3) 이를 바탕으로 데이터를 갱신하게 코드를 작성한다.
4) backfill이 쉬워진다.
start_date와 execution_date
- start_date는 (dag)시작되는 날짜임 =데이터수집 시작날짜
- execution_date는 (Task Instance) 시작되는 날짜