airflow(에어플로우)

airflow dag(나라정보) 실습

데이터왕 2024. 1. 2. 16:41

목표

  • api로부터 나라 정보를 추출한다.
  • full regresh로 구현할것
  • 아래 3개의 정보를 추출해서 redshift에 각자 스키마 밑에 테이블 생성
    country ->["name"]["official"]
    population -> ["population"]
    area -> ["area"]
  • UTC로 매주 토요일 오전 6시 30분에 실행되게 만들것

코드

import requests
from airflow.decorators import DAG, task
from datetime import datetime

@task
def extract_transform():
    # 국가 정보를 얻기 위한 API 요청
    response = requests.get('https://restcountries.com/v3/all')
    countries = response.json()
        
    # 응답을 처리하고 관련 정보 추출
    records = []
    for country in countries:
        name = country['name']['common']
        population = country['population']
        area = country['area']
        records.append([name, population, area])
        
    return records

    # 데이터베이스로 데이터 로드
results = extract_transform()

# DAG 정의
with DAG(
    dag_id='CountryInfo',
    start_date=datetime(2023, 5, 30),
    catchup=False,
    tags=['API'],
    schedule='30 6 * * 6'  # 0 - 일요일, …, 6 - 토요일 -> 매주 토요일 오전 6시30분에 실행됨
) as dag:

    # 데이터 추출 및 변환을 위한 태스크
    results = extract_transform()
    # load 함수는 추출한 데이터를 데이터베이스 테이블에 삽입하는 작업을 수행합니다.
    load("keeyong", "country_info", results)

# SQL 문으로 데이터베이스에 대상 테이블을 만듭니다.
create_table_sql = """
CREATE TABLE {schema}.{table} (
    name varchar(256) primary key,
    population int,
    area float
);