primary key uniqueness란?
- Primary key(기본 키)는 데이터베이스 테이블에서 각 행을 고유하게 식별하는 열입니다. Primary key는 해당 테이블의 레코드를 고유하게 식별하는 데 사용되며, 각 행은 오직 하나의 primary key 값을 가집니다.
- Primary key 의 주요 특징:
1) 고유성 (Uniqueness):** 각 행은 고유한 primary key 값을 가져야 합니다. 어떤 두 행도 같은 primary key 값을 가질 수 없습니다.
2) NULL 값 금지 (Non-null):** 기본 키 값은 NULL이 아니어야 합니다. 즉, 해당 열에는 NULL 값이 허용되지 않습니다.
3) 일반적으로 정렬 (Ordering):** 기본 키 값은 일반적으로 테이블 내에서 정렬된 상태를 갖습니다.
Primary key는 테이블 간의 관계를 설정하고 데이터를 효율적으로 관리하는 데 중요한 역할을 합니다. 관계형 데이터베이스에서는 foreign key를 사용하여 다른 테이블과의 관계를 설정할 때 primary key를 활용합니다. Primary key를 사용하면 각 행을 빠르게 찾을 수 있으며 데이터 무결성을 보장할 수 있습니다. - Primary key 의 예
users 테이블에서 email 필드
products 테이블에서 product_id 필드 - 코드에서 사용예
+foreign key는 꼭 적지 않아도 되지만 , 최적화를 도와줌
빅데이터 기반 데이터 웨어하우스에서 primary key
- primary key를 보장해주지 않음-> 데이터 엔지니어가 개입해야하는부분
- 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터 적재에 방해가 된다.
+관계형 데이터베이스에서는 보장해준다. - 단순히 primary key를 선언만 해주는것은 의미가 없음
primary key 유지방법
- CREATE TABLE keeyong.weather_forecast (
date date primary key,
temp float,
min_temp float,
max_temp float,
# 어느 정보가 더 최근 정보인지를 created_date에 기록
created_date timestamp default GETDATE()
); - 날씨 정보기 때문에 최근 정보가 더 신뢰 있음 (어제 일주일 예보vs 오늘 일주일 예보)
겹치는 날짜(date)들이 있다면 created_date를 기준으로 더 최근 정보를 선택
이를 하는데 적합한 sql 문법이 row_number - row_number 이용한 primary key 유지
ROW_NUMBER() OVER (partition by date order by created_date DESC) as seq
1) date 별로 묶고 created_date의 역순으로 일련번호를 만든다
2) 만든 일렬번호 seq 열을 만들어서 넣음
3) 가장최신예보(seq =1) 만 뽑으면 자연스럽게 중복이 제거되고 pk uniqueness를 보장할수 있음
- row_number 방법 실제 제작 순서
1) 임시 테이블 만들고 모든 레코드 복사
2) 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사(중복존재)
3) row_number 를 이용해 최신데이터 구분
4) 원본테이블 기존데이터 삭제하고
5) 임시테이블에서 중복제거된 최신 데이터 가져옴
CREATE TEMP TABLE t AS SELECT * FROM keeyong.weather_forecast; (1)
DELETE FROM keeyong.weather_forecast; (4)
INSERT INTO keeyong.weather_forecast (5)
SELECT date, temp, min_temp, max_temp, created_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq (3)
FROM t )
WHERE seq = 1; - 전체코드
from airflow import DAG from airflow.decorators import task from airflow.models import Variable from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from datetime import timedelta import requests import logging import json def get_Redshift_connection(): # Redshift에 연결하고 커서를 반환합니다. # 기본적으로 autocommit은 False입니다. hook = PostgresHook(postgres_conn_id='redshift_dev_db') return hook.get_conn().cursor() @task def etl(schema, table, lat, lon, api_key): # OpenWeatherMap API에서 날씨 데이터를 추출하여 Redshift 테이블에 로드하는 ETL 작업 # OpenWeatherMap API에 대한 요청을 생성합니다. url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts" response = requests.get(url) data = json.loads(response.text) # 날씨 데이터를 가공하여 Redshift에 입력하기 위한 포맷으로 변환합니다. ret = [] for d in data["daily"]: day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"])) cur = get_Redshift_connection() # 원본 테이블이 없다면 생성 create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} ( date date, temp float, min_temp float, max_temp float, created_date timestamp default GETDATE() );""" logging.info(create_table_sql) # 임시 테이블 생성 create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};""" logging.info(create_t_sql) try: cur.execute(create_table_sql) cur.execute(create_t_sql) #autocommit =false라 써줘야함 cur.execute("COMMIT;") except Exception as e: cur.execute("ROLLBACK;") raise # 임시 테이블 데이터 입력 insert_sql = f"INSERT INTO t VALUES " + ",".join(ret) logging.info(insert_sql) try: cur.execute(insert_sql) cur.execute("COMMIT;") except Exception as e: cur.execute("ROLLBACK;") raise # 기존 테이블 대체 alter_sql = f"""DELETE FROM {schema}.{table}; INSERT INTO {schema}.{table} SELECT date, temp, min_temp, max_temp FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM t ) WHERE seq = 1;""" logging.info(alter_sql) try: cur.execute(alter_sql) cur.execute("COMMIT;") except Exception as e: cur.execute("ROLLBACK;") raise with DAG( dag_id='Weather_to_Redshift_v2', start_date=datetime(2022, 8, 24), schedule='0 4 * * *', # 적당히 조절 max_active_runs=1, catchup=False, default_args={ 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) as dag: # ETL 작업을 호출하며, OpenWeatherMap API 키는 Airflow Variable로부터 가져옵니다. etl("본인id", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key")) |
- 실습파일 본인 id로 수정(vim 편집기 진입)
weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim Weather_to_Redshift_v2.py
- 실행(정상실행 확인)
'airflow(에어플로우)' 카테고리의 다른 글
airflow에서 primary key 방법2 (0) | 2024.01.03 |
---|---|
backfill과 airflow (0) | 2024.01.02 |
airflow dag(기후정보) 실습 (0) | 2024.01.02 |
airflow dag(나라정보) 실습 (0) | 2024.01.02 |
airflow.cfg 파일 (0) | 2024.01.02 |