airflow(에어플로우)

backfill과 airflow

데이터왕 2024. 1. 2. 23:49

incremental update가 실패하면?

  • full refresh를 할때는 새로 지우고 새로만들어서 실패하지 않는다.(backfill 할 필요가없음)
  • 따라서 데이터가 너무 크지 않는 이상은 full refresh를 하는게 좋다.
  • incremental update 시는 backfill의 문제가 생긴다. 그래서 운영/유지보수의 난이도가 올라간다.


backfill

  • backfill의 정의 : 실패한 데이터 파이프라인을 재실행 하는경우 or 읽어온 데이터가 문제있어 다시 읽어와야하는 경우
  • 즉 재실행이 얼마나 용이한 구조인지가 중요한데, 그게 잘 디자인된 툴이 airflow이다.

  • 방법 1 backfill 단순하게 하면 어떻게 될까?

    1)  dag가 이렇게 있다고 가정
    from datetime import datetime, timedelta
    # 지금 시간 기준으로 어제 날짜를 계산
    y = datetime.now() - timedelta(1)
    yesterday = datetime.strftime(y, '%Y-%m-%d')

    # yesterday에 해당하는 데이터를 소스에서 읽어옴
    # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
    sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

    2) 기존 변수를 지우고, 날짜를 특정날짜( '2023-01-01') 로 하드코딩
    from datetime import datetime, timedelta
    yesterday = '2023-01-01'
    # yesterday에 해당하는 데이터를 소스에서 읽어옴
    # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
    sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

    3) 문제점 : 코드를 다시 고쳐줘야하고, 날짜 잘못입력가능, 사람이 실수를 할수있는 포인트가 많음

  • 방법 2 airflow를 이용
    1) dag 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
    2) 모든 dag 실행에는 "execution_date"이 지정돼있음
    + execution_date으로 채워야하는 날짜와 시간이 넘어옴
    + execution_date는 실패한 날짜의 데이터 수집 시작 날짜
    3) 이를 바탕으로 데이터를 갱신하게 코드를 작성한다.
    4) backfill이 쉬워진다.

 

start_date와 execution_date

  • start_date는 (dag)시작되는 날짜임 =데이터수집 시작날짜
  • execution_date는 (Task Instance) 시작되는 날짜