backfill을 할수 있는 조건

  • 모든 dag가 backfill을 필요로 하지 않는다.(full refresh는 필요없음)
  • 일별 시간별 업데이트 할때만 backfill이 필요하다.
  • 데이터 소스에 그날 바뀌거나 새로생긴 레코드를 구분할수 있는 기능이 있어야함
  • 데이터 크기가 커지면 backfill 기능을 필수적으로 구현


daily incremental dag에서 2018년 7월달 데이터를 다시 읽어와야 한다면

  1. 하루씩 지금까지 실행?
  2. 한번에 여러 날짜를 동시에 실행 , max_active_runs
  3. 예상되는 문제
    1) 데이터를 요구받는 소스측에서 감당못해 속도를 줄일수 있음 
    2) 동시 실행시 충돌이 날수도 있음
  4. backfill 실행전 준비사항
    1) catchup=True
    2) execution_date 사용해서 incremental update가 구현돼있음.
  5. 실행순서
    1) 실행순서는 날짜/시간순 아니고 랜덤.
    2) 날짜순으로 하고 싶으면 
  6. 커맨드라인에서 실행
    airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01

incremental update가 실패하면?

  • full refresh를 할때는 새로 지우고 새로만들어서 실패하지 않는다.(backfill 할 필요가없음)
  • 따라서 데이터가 너무 크지 않는 이상은 full refresh를 하는게 좋다.
  • incremental update 시는 backfill의 문제가 생긴다. 그래서 운영/유지보수의 난이도가 올라간다.


backfill

  • backfill의 정의 : 실패한 데이터 파이프라인을 재실행 하는경우 or 읽어온 데이터가 문제있어 다시 읽어와야하는 경우
  • 즉 재실행이 얼마나 용이한 구조인지가 중요한데, 그게 잘 디자인된 툴이 airflow이다.

  • 방법 1 backfill 단순하게 하면 어떻게 될까?

    1)  dag가 이렇게 있다고 가정
    from datetime import datetime, timedelta
    # 지금 시간 기준으로 어제 날짜를 계산
    y = datetime.now() - timedelta(1)
    yesterday = datetime.strftime(y, '%Y-%m-%d')

    # yesterday에 해당하는 데이터를 소스에서 읽어옴
    # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
    sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

    2) 기존 변수를 지우고, 날짜를 특정날짜( '2023-01-01') 로 하드코딩
    from datetime import datetime, timedelta
    yesterday = '2023-01-01'
    # yesterday에 해당하는 데이터를 소스에서 읽어옴
    # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
    sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"

    3) 문제점 : 코드를 다시 고쳐줘야하고, 날짜 잘못입력가능, 사람이 실수를 할수있는 포인트가 많음

  • 방법 2 airflow를 이용
    1) dag 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
    2) 모든 dag 실행에는 "execution_date"이 지정돼있음
    + execution_date으로 채워야하는 날짜와 시간이 넘어옴
    + execution_date는 실패한 날짜의 데이터 수집 시작 날짜
    3) 이를 바탕으로 데이터를 갱신하게 코드를 작성한다.
    4) backfill이 쉬워진다.

 

start_date와 execution_date

  • start_date는 (dag)시작되는 날짜임 =데이터수집 시작날짜
  • execution_date는 (Task Instance) 시작되는 날짜

+ Recent posts