목표
- api로부터 나라 정보를 추출한다.
- full regresh로 구현할것
- 아래 3개의 정보를 추출해서 redshift에 각자 스키마 밑에 테이블 생성
country ->["name"]["official"]
population -> ["population"]
area -> ["area"] - UTC로 매주 토요일 오전 6시 30분에 실행되게 만들것
코드
import requests from airflow.decorators import DAG, task from datetime import datetime @task def extract_transform(): # 국가 정보를 얻기 위한 API 요청 response = requests.get('https://restcountries.com/v3/all') countries = response.json() # 응답을 처리하고 관련 정보 추출 records = [] for country in countries: name = country['name']['common'] population = country['population'] area = country['area'] records.append([name, population, area]) return records # 데이터베이스로 데이터 로드 results = extract_transform() # DAG 정의 with DAG( dag_id='CountryInfo', start_date=datetime(2023, 5, 30), catchup=False, tags=['API'], schedule='30 6 * * 6' # 0 - 일요일, …, 6 - 토요일 -> 매주 토요일 오전 6시30분에 실행됨 ) as dag: # 데이터 추출 및 변환을 위한 태스크 results = extract_transform() # load 함수는 추출한 데이터를 데이터베이스 테이블에 삽입하는 작업을 수행합니다. load("keeyong", "country_info", results) # SQL 문으로 데이터베이스에 대상 테이블을 만듭니다. create_table_sql = """ CREATE TABLE {schema}.{table} ( name varchar(256) primary key, population int, area float ); |

'airflow(에어플로우)' 카테고리의 다른 글
데이터 웨어하우스에서 primary key (0) | 2024.01.02 |
---|---|
airflow dag(기후정보) 실습 (0) | 2024.01.02 |
airflow.cfg 파일 (0) | 2024.01.02 |
airflow dag(주식정보 수집) 실습2-incremental update (0) | 2024.01.02 |
airflow dag(주식정보 수집) 실습1 -full refresh 방식 (0) | 2024.01.02 |