airflowdbt란
- Apache Airflow와 dbt(Data Build Tool)를 함께 사용하여 데이터 웨어하우스에서 ETL(Extract, Transform, Load) 작업을 수행하기 위한 도구 또는 플러그인
mau(월간사용자정보) 코드
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook from datetime import datetime from datetime import timedelta from airflow import AirflowException import requests import logging import psycopg2 from airflow.exceptions import AirflowException # Redshift에 연결하는 함수 def get_Redshift_connection(): hook = PostgresHook(postgres_conn_id='redshift_dev_db') return hook.get_conn().cursor() # SQL을 실행하고 특정 테이블에 데이터를 적재하는 함수 def execSQL(**context): schema = context['params']['schema'] table = context['params']['table'] select_sql = context['params']['sql'] # 로그에 정보 출력 logging.info(schema) logging.info(table) logging.info(select_sql) # Redshift 연결 및 커서 생성 cur = get_Redshift_connection() # 임시테이블에 내용 있으면 삭제, 임시 테이블 생성 후 CTAS 로 데이터 적재 sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table}; CREATE TABLE {schema}.temp_{table} AS {select_sql}""" cur.execute(sql) # 임시 테이블의 레코드 수 확인 cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""") count = cur.fetchone()[0] # 레코드가 없으면 에러 발생 if count == 0: raise ValueError(f"{schema}.{table} didn't have any record") try: # 기존 테이블 삭제 및 임시 테이블을 기존 테이블 이름으로 변경 sql = f"""DROP TABLE IF EXISTS {schema}.{table}; ALTER TABLE {schema}.temp_{table} RENAME TO {table}; COMMIT;""" logging.info(sql) cur.execute(sql) except Exception as e: # 롤백 시도 cur.execute("ROLLBACK") logging.error('Failed to execute SQL. Completed ROLLBACK!') # Airflow 예외 발생 raise AirflowException("") # DAG 정의 dag = DAG( dag_id="Build_Summary", start_date=datetime(2021, 12, 10), schedule='@once', catchup=False ) # PythonOperator를 사용하여 execSQL 함수를 실행하는 태스크 정의 execsql = PythonOperator( task_id='mau_summary', python_callable=execSQL, params={ 'schema': 'keeyong', 'table': 'mau_summary', # mau : 사용자가 해당 월에 방문했을때, 1번만 count 'sql': """SELECT TO_CHAR(A.ts, 'YYYY-MM') AS month, COUNT(DISTINCT B.userid) AS mau FROM raw_data.session_timestamp A JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid GROUP BY 1;""" }, dag=dag ) |
사용자별 channel 정보
- 코드

- 추가 아이디어 : CTAS 부분을 별도의 환경설정 파일로 떼어내보자. config 폴더를 만들고, 테이블별로 py 파일을 제작.
이렇게 하면 비개발자(데이터분석가) 등이 어려움을 덜 느낄 수 있다.

NPS summary 테이블
- 10점 만점으로 '주변에 추천하겠는가?'라는 질문을 기반으로 고객 만족도를 계산
- 10, 9점 추천하겠다는 고객(promoter)의 비율에서 0-6점의 불평고객(detractor)의 비율을 뺀 것이 NPS

- 각자스키마.nps 테이블 or raw_data.nps 테이블 기준으로 일별 NPS summary 생성
- 일별 NPS 계산 SQL문
![]() |
1. `SELECT LEFT(created_at, 10) AS date`: `created_at` 열에서 각 날짜의 처음 10자리를 선택하여 `date`로 지정합니다. 이는 날짜를 추출하는 부분입니다. 2. `ROUND(SUM(CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2) AS nps`: - `CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END`: 각 점수에 대해 Promoter(9 이상), Detractor(6 이하)를 구분하여 1 또는 -1을 할당합니다. - `SUM(...)`: 각 날짜에 대한 총 합계를 계산합니다. - `COUNT(1)`: 총 레코드 수를 세어서 각 날짜의 응답자 수를 계산합니다. - `::float`: 결과를 실수형으로 형변환합니다. - `*100/COUNT(1)`: 비율을 계산하고 100을 곱하여 퍼센트로 변환합니다. - `ROUND(..., 2)`: 소수점 이하 두 자리까지 반올림하여 NPS를 계산합니다. 3. `FROM keeyong.nps`: `keeyong` 스키마의 `nps` 테이블에서 데이터를 가져옵니다. 4. `GROUP BY 1`: 날짜별로 그룹화하여 각 날짜에 대한 NPS를 계산합니다. 5. `ORDER BY 1`: 결과를 날짜 순서로 정렬합니다. 따라서 이 쿼리는 각 날짜에 대한 NPS를 계산하고, 이를 날짜별로 그룹화하여 정렬한 결과를 반환합니다. |
- CTAS 부분을 떼어내기(이부분을 learn-airflow/dags/config/nps_summary.py 에 분리해 저장)

'airflow(에어플로우)' 카테고리의 다른 글
[airflow 실습] redshift_summary.py 분석 (0) | 2024.01.05 |
---|---|
airflow variable을 yaml 파일 안에 숨기기 (0) | 2024.01.04 |
airflow 실행 환경 관리 (0) | 2024.01.04 |
Airflow에서 _mysql is not defined 에러 (0) | 2024.01.03 |
airflow에서 backfill 실행 (0) | 2024.01.03 |