airflow(에어플로우)
airflow dag(나라정보) 실습
데이터왕
2024. 1. 2. 16:41
목표
- api로부터 나라 정보를 추출한다.
- full regresh로 구현할것
- 아래 3개의 정보를 추출해서 redshift에 각자 스키마 밑에 테이블 생성
country ->["name"]["official"]
population -> ["population"]
area -> ["area"] - UTC로 매주 토요일 오전 6시 30분에 실행되게 만들것
코드
import requests from airflow.decorators import DAG, task from datetime import datetime @task def extract_transform(): # 국가 정보를 얻기 위한 API 요청 response = requests.get('https://restcountries.com/v3/all') countries = response.json() # 응답을 처리하고 관련 정보 추출 records = [] for country in countries: name = country['name']['common'] population = country['population'] area = country['area'] records.append([name, population, area]) return records # 데이터베이스로 데이터 로드 results = extract_transform() # DAG 정의 with DAG( dag_id='CountryInfo', start_date=datetime(2023, 5, 30), catchup=False, tags=['API'], schedule='30 6 * * 6' # 0 - 일요일, …, 6 - 토요일 -> 매주 토요일 오전 6시30분에 실행됨 ) as dag: # 데이터 추출 및 변환을 위한 태스크 results = extract_transform() # load 함수는 추출한 데이터를 데이터베이스 테이블에 삽입하는 작업을 수행합니다. load("keeyong", "country_info", results) # SQL 문으로 데이터베이스에 대상 테이블을 만듭니다. create_table_sql = """ CREATE TABLE {schema}.{table} ( name varchar(256) primary key, population int, area float ); |