incremental update가 실패하면?
- full refresh를 할때는 새로 지우고 새로만들어서 실패하지 않는다.(backfill 할 필요가없음)
- 따라서 데이터가 너무 크지 않는 이상은 full refresh를 하는게 좋다.
- incremental update 시는 backfill의 문제가 생긴다. 그래서 운영/유지보수의 난이도가 올라간다.
backfill
- backfill의 정의 : 실패한 데이터 파이프라인을 재실행 하는경우 or 읽어온 데이터가 문제있어 다시 읽어와야하는 경우
- 즉 재실행이 얼마나 용이한 구조인지가 중요한데, 그게 잘 디자인된 툴이 airflow이다.
- 방법 1 backfill 단순하게 하면 어떻게 될까?
1) dag가 이렇게 있다고 가정
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
2) 기존 변수를 지우고, 날짜를 특정날짜( '2023-01-01') 로 하드코딩
from datetime import datetime, timedelta
yesterday = '2023-01-01'
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
3) 문제점 : 코드를 다시 고쳐줘야하고, 날짜 잘못입력가능, 사람이 실수를 할수있는 포인트가 많음 - 방법 2 airflow를 이용
1) dag 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
2) 모든 dag 실행에는 "execution_date"이 지정돼있음
+ execution_date으로 채워야하는 날짜와 시간이 넘어옴
+ execution_date는 실패한 날짜의 데이터 수집 시작 날짜
3) 이를 바탕으로 데이터를 갱신하게 코드를 작성한다.
4) backfill이 쉬워진다.
start_date와 execution_date
- start_date는 (dag)시작되는 날짜임 =데이터수집 시작날짜
- execution_date는 (Task Instance) 시작되는 날짜
'airflow(에어플로우)' 카테고리의 다른 글
No module named MySQLdb 에러 (0) | 2024.01.03 |
---|---|
airflow에서 primary key 방법2 (0) | 2024.01.03 |
데이터 웨어하우스에서 primary key (0) | 2024.01.02 |
airflow dag(기후정보) 실습 (0) | 2024.01.02 |
airflow dag(나라정보) 실습 (0) | 2024.01.02 |