airflowdbt란

  •  Apache Airflow와 dbt(Data Build Tool)를 함께 사용하여 데이터 웨어하우스에서 ETL(Extract, Transform, Load) 작업을 수행하기 위한 도구 또는 플러그인

 

mau(월간사용자정보) 코드

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta

from airflow import AirflowException

import requests
import logging
import psycopg2

from airflow.exceptions import AirflowException

# Redshift에 연결하는 함수
def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

# SQL을 실행하고 특정 테이블에 데이터를 적재하는 함수
def execSQL(**context):
    schema = context['params']['schema']
    table = context['params']['table']
    select_sql = context['params']['sql']

    # 로그에 정보 출력
    logging.info(schema)
    logging.info(table)
    logging.info(select_sql)

    # Redshift 연결 및 커서 생성
    cur = get_Redshift_connection()

    # 임시테이블에 내용 있으면 삭제, 임시 테이블 생성 후 CTAS 로 데이터 적재
    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};
              CREATE TABLE {schema}.temp_{table} AS {select_sql}"""
    cur.execute(sql)

    # 임시 테이블의 레코드 수 확인
    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]

    # 레코드가 없으면 에러 발생
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")

    try:
        # 기존 테이블 삭제 및 임시 테이블을 기존 테이블 이름으로 변경
        sql = f"""DROP TABLE IF EXISTS {schema}.{table};
                  ALTER TABLE {schema}.temp_{table} RENAME TO {table};
                  COMMIT;"""
        logging.info(sql)
        cur.execute(sql)

    except Exception as e:
        # 롤백 시도
        cur.execute("ROLLBACK")
        logging.error('Failed to execute SQL. Completed ROLLBACK!')
        # Airflow 예외 발생
        raise AirflowException("")

# DAG 정의
dag = DAG(
    dag_id="Build_Summary",
    start_date=datetime(2021, 12, 10),
    schedule='@once',
    catchup=False
)

# PythonOperator를 사용하여 execSQL 함수를 실행하는 태스크 정의
execsql = PythonOperator(
    task_id='mau_summary',
    python_callable=execSQL,
    params={
        'schema': 'keeyong',
        'table': 'mau_summary',

# mau : 사용자가 해당 월에 방문했을때, 1번만 count
        'sql': """SELECT 
                    TO_CHAR(A.ts, 'YYYY-MM') AS month,
                    COUNT(DISTINCT B.userid) AS mau
                  FROM raw_data.session_timestamp A
                  JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
                  GROUP BY 1;"""
    },
    dag=dag
)

 

 

사용자별 channel 정보

  • 코드

 

  • 추가 아이디어 : CTAS 부분을 별도의 환경설정 파일로 떼어내보자. config 폴더를 만들고, 테이블별로 py 파일을 제작.
    이렇게 하면 비개발자(데이터분석가) 등이 어려움을 덜 느낄 수 있다.

 

NPS summary 테이블

  • 10점 만점으로 '주변에 추천하겠는가?'라는 질문을 기반으로 고객 만족도를 계산
  • 10, 9점 추천하겠다는 고객(promoter)의 비율에서 0-6점의 불평고객(detractor)의 비율을 뺀 것이 NPS

  • 각자스키마.nps 테이블 or raw_data.nps 테이블 기준으로 일별 NPS summary 생성
  • 일별 NPS 계산 SQL문
1. `SELECT LEFT(created_at, 10) AS date`: `created_at` 열에서 각 날짜의 처음 10자리를 선택하여 `date`로 지정합니다. 이는 날짜를 추출하는 부분입니다.

2. `ROUND(SUM(CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2) AS nps`:
   - `CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END`: 각 점수에 대해 Promoter(9 이상), Detractor(6 이하)를 구분하여 1 또는 -1을 할당합니다.
   - `SUM(...)`: 각 날짜에 대한 총 합계를 계산합니다.
   - `COUNT(1)`: 총 레코드 수를 세어서 각 날짜의 응답자 수를 계산합니다.
   - `::float`: 결과를 실수형으로 형변환합니다.
   - `*100/COUNT(1)`: 비율을 계산하고 100을 곱하여 퍼센트로 변환합니다.
   - `ROUND(..., 2)`: 소수점 이하 두 자리까지 반올림하여 NPS를 계산합니다.

3. `FROM keeyong.nps`: `keeyong` 스키마의 `nps` 테이블에서 데이터를 가져옵니다.

4. `GROUP BY 1`: 날짜별로 그룹화하여 각 날짜에 대한 NPS를 계산합니다.

5. `ORDER BY 1`: 결과를 날짜 순서로 정렬합니다.

따라서 이 쿼리는 각 날짜에 대한 NPS를 계산하고, 이를 날짜별로 그룹화하여 정렬한 결과를 반환합니다.
  • CTAS 부분을 떼어내기(이부분을 learn-airflow/dags/config/nps_summary.py 에 분리해 저장)

+ Recent posts