FROM apache/airflow:2.5.1

 

USER root
RUN apt-get update \
  && apt-get install -y --no-install-recommends \
         vim \
  && apt-get autoremove -yqq --purge \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*
Dockerfile 내에서 패키지를 설치하고 이미지를 최적화
1. `apt-get update`: 패키지 목록을 업데이트합니다.
2. `apt-get install -y --no-install-recommends vim`: vim 패키지를 설치하고 권장 패키지를 설치하지 않도록 합니다.
3. `apt-get autoremove -yqq --purge`: 불필요한 패키지를 자동으로 제거합니다.
4. `apt-get clean`: apt의 캐시를 지웁니다.
5. `rm -rf /var/lib/apt/lists/*`: 패키지 목록과 관련된 임시 파일을 제거합니다.

USER airflow

# app으로 requirements.txt 복사
COPY requirements.txt /app/requirements.txt
RUN pip3 install -r /app/requirements.txt


그냥하면 되는걸 왜 복사하고 설치할까?

Docker는 각 명령어를 캐싱하여 이미지 빌드 속도를 높이려고 합니다. 만약 COPY requirements.txt /app/requirements.txt 를 먼저 수행하고, 그 후에 소스 코드의 변경이 없는 경우 Docker는 이전 단계를 캐시하고 pip3 install을 다시 실행하지 않습니다. 이로써 이미지 빌드가 더 빨라집니다.

만약 소스 코드가 변경되지 않았고, requirements.txt 파일도 변경되지 않았다면, Docker는 이전에 설치된 패키지를 그대로 사용할 수 있습니다. 따라서 COPY ~ 를 따로 두는 것은 이미지 빌드 성능을 최적화하는 한 방법입니다.


# DAG 파일 복사
COPY dags/ /opt/airflow/dags/

 


airflowdbt란

  •  Apache Airflow와 dbt(Data Build Tool)를 함께 사용하여 데이터 웨어하우스에서 ETL(Extract, Transform, Load) 작업을 수행하기 위한 도구 또는 플러그인

 

mau(월간사용자정보) 코드

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta

from airflow import AirflowException

import requests
import logging
import psycopg2

from airflow.exceptions import AirflowException

# Redshift에 연결하는 함수
def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

# SQL을 실행하고 특정 테이블에 데이터를 적재하는 함수
def execSQL(**context):
    schema = context['params']['schema']
    table = context['params']['table']
    select_sql = context['params']['sql']

    # 로그에 정보 출력
    logging.info(schema)
    logging.info(table)
    logging.info(select_sql)

    # Redshift 연결 및 커서 생성
    cur = get_Redshift_connection()

    # 임시테이블에 내용 있으면 삭제, 임시 테이블 생성 후 CTAS 로 데이터 적재
    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};
              CREATE TABLE {schema}.temp_{table} AS {select_sql}"""
    cur.execute(sql)

    # 임시 테이블의 레코드 수 확인
    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]

    # 레코드가 없으면 에러 발생
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")

    try:
        # 기존 테이블 삭제 및 임시 테이블을 기존 테이블 이름으로 변경
        sql = f"""DROP TABLE IF EXISTS {schema}.{table};
                  ALTER TABLE {schema}.temp_{table} RENAME TO {table};
                  COMMIT;"""
        logging.info(sql)
        cur.execute(sql)

    except Exception as e:
        # 롤백 시도
        cur.execute("ROLLBACK")
        logging.error('Failed to execute SQL. Completed ROLLBACK!')
        # Airflow 예외 발생
        raise AirflowException("")

# DAG 정의
dag = DAG(
    dag_id="Build_Summary",
    start_date=datetime(2021, 12, 10),
    schedule='@once',
    catchup=False
)

# PythonOperator를 사용하여 execSQL 함수를 실행하는 태스크 정의
execsql = PythonOperator(
    task_id='mau_summary',
    python_callable=execSQL,
    params={
        'schema': 'keeyong',
        'table': 'mau_summary',

# mau : 사용자가 해당 월에 방문했을때, 1번만 count
        'sql': """SELECT 
                    TO_CHAR(A.ts, 'YYYY-MM') AS month,
                    COUNT(DISTINCT B.userid) AS mau
                  FROM raw_data.session_timestamp A
                  JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
                  GROUP BY 1;"""
    },
    dag=dag
)

 

 

사용자별 channel 정보

  • 코드

 

  • 추가 아이디어 : CTAS 부분을 별도의 환경설정 파일로 떼어내보자. config 폴더를 만들고, 테이블별로 py 파일을 제작.
    이렇게 하면 비개발자(데이터분석가) 등이 어려움을 덜 느낄 수 있다.

 

NPS summary 테이블

  • 10점 만점으로 '주변에 추천하겠는가?'라는 질문을 기반으로 고객 만족도를 계산
  • 10, 9점 추천하겠다는 고객(promoter)의 비율에서 0-6점의 불평고객(detractor)의 비율을 뺀 것이 NPS

  • 각자스키마.nps 테이블 or raw_data.nps 테이블 기준으로 일별 NPS summary 생성
  • 일별 NPS 계산 SQL문
1. `SELECT LEFT(created_at, 10) AS date`: `created_at` 열에서 각 날짜의 처음 10자리를 선택하여 `date`로 지정합니다. 이는 날짜를 추출하는 부분입니다.

2. `ROUND(SUM(CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2) AS nps`:
   - `CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END`: 각 점수에 대해 Promoter(9 이상), Detractor(6 이하)를 구분하여 1 또는 -1을 할당합니다.
   - `SUM(...)`: 각 날짜에 대한 총 합계를 계산합니다.
   - `COUNT(1)`: 총 레코드 수를 세어서 각 날짜의 응답자 수를 계산합니다.
   - `::float`: 결과를 실수형으로 형변환합니다.
   - `*100/COUNT(1)`: 비율을 계산하고 100을 곱하여 퍼센트로 변환합니다.
   - `ROUND(..., 2)`: 소수점 이하 두 자리까지 반올림하여 NPS를 계산합니다.

3. `FROM keeyong.nps`: `keeyong` 스키마의 `nps` 테이블에서 데이터를 가져옵니다.

4. `GROUP BY 1`: 날짜별로 그룹화하여 각 날짜에 대한 NPS를 계산합니다.

5. `ORDER BY 1`: 결과를 날짜 순서로 정렬합니다.

따라서 이 쿼리는 각 날짜에 대한 NPS를 계산하고, 이를 날짜별로 그룹화하여 정렬한 결과를 반환합니다.
  • CTAS 부분을 떼어내기(이부분을 learn-airflow/dags/config/nps_summary.py 에 분리해 저장)

# 필요한 모듈과 클래스 임포트
from airflow import DAG
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
import logging
from glob import glob

# DAG에서 사용할 함수: 주어진 디렉토리의 JSON 파일을 읽어들이는 함수
def load_all_jsons_into_list(path_to_json):
    # configs 리스트 초기화
    configs = []
    
    # 주어진 디렉토리에 있는 모든 .py 파일에 대해 반복
    for f_name in glob(path_to_json + '/*.py'):
        # 파일 열기
        with open(f_name) as f:
            # 파일 내용을 텍스트로 읽어들임
            dict_text = f.read()
            
            try:
                # 읽어들인 텍스트를 eval 함수를 사용하여 파이썬 딕셔너리로 변환
                dict = eval(dict_text)
            except Exception as e:
                # 변환 중 에러가 발생하면 로그에 에러 메시지 기록하고 예외 전파
                logging.info(str(e))
                raise
            else:
                # 성공적으로 변환된 딕셔너리를 configs 리스트에 추가
                configs.append(dict)
    
    # 최종적으로 변환된 딕셔너리들을 담고 있는 configs 리스트 반환
    return configs


# 주어진 테이블 이름과 테이블 설정에서 테이블을 찾는 함수
def find(table_name, table_confs):
    # table_confs 리스트를 순회하며 각 테이블 설정 딕셔너리를 확인
    for table in table_confs:
        # 현재 테이블 설정 딕셔너리의 "table" 키 값이 주어진 table_name과 일치하는지 확인
        if table.get("table") == table_name:
            # 일치하는 경우 해당 테이블 설정 딕셔너리를 반환
            return table
    
    # 모든 테이블 설정을 확인한 후에도 일치하는 것이 없으면 None 반환
    return None

# Redshift에서 요약 테이블을 생성하는 함수
def build_summary_table(dag_root_path, dag, tables_load, redshift_conn_id, start_task=None):
    # 로그에 DAG 루트 경로 출력
    logging.info(dag_root_path)
    
    # JSON 형식의 테이블 설정 파일들을 읽어들임
    table_confs = load_all_jsons_into_list(dag_root_path + "/config/")

    # DAG의 시작 작업이 지정되었으면 해당 작업을 기준으로 설정
    if start_task is not None:
        prev_task = start_task
    else:
        prev_task = None

    # tables_load에 지정된 테이블들에 대해 작업 생성 및 DAG에 추가
    for table_name in tables_load:
        # 지정된 테이블명에 해당하는 테이블 설정 찾기
        table = find(table_name, table_confs)
        
        # RedshiftSummaryOperator를 생성하고 설정값 전달
        summarizer = RedshiftSummaryOperator(
            table=table["table"],
            schema=table["schema"],
            redshift_conn_id=redshift_conn_id,
            input_check=table["input_check"],
            main_sql=table["main_sql"],
            output_check=table["output_check"],
            overwrite=table.get("overwrite", True),
            after_sql=table.get("after_sql"),
            pre_sql=table.get("pre_sql"),
            attributes=table.get("attributes", ""),
            dag=dag,
            task_id="analytics"+"__"+table["table"]
        )
        
        # 이전 작업과 현재 작업 간의 의존성 설정
        if prev_task is not None:
            prev_task >> summarizer
        prev_task = summarizer
    
    # 마지막으로 추가된 작업을 반환 (나중에 DAG 작성 시 필요)
    return prev_task

# Redshift에서 SQL을 실행하는 함수
def redshift_sql_function(**context):
    sql=context["params"]["sql"]
    print(sql)
    hook = PostgresHook(postgres_conn_id=context["params"]["redshift_conn_id"])
    hook.run(sql, True)

# Redshift에서 요약 테이블을 생성하는 PythonOperator를 확장한 사용자 정의 연산자 클래스
class RedshiftSummaryOperator(PythonOperator):
    @apply_defaults
    def __init__(self, schema, table, redshift_conn_id, input_check, main_sql, output_check, overwrite, params={}, pre_sql="", after_sql="", attributes="", *args, **kwargs):
        # 사용자가 정의한 RedshiftSummaryOperator 클래스의 초기화 메소드입니다.
        
        # 생성할 요약 테이블의 Redshift 스키마 이름
        self.schema = schema
        
        # 생성할 요약 테이블의 이름
        self.table = table
        
        # Airflow에서 사용하는 Redshift 연결 ID
        self.redshift_conn_id = redshift_conn_id
        
        # 입력 유효성을 검사하기 위한 SQL 쿼리 및 최소 레코드 수로 구성된 목록
        self.input_check = input_check
        
        # 요약 테이블을 생성하기 위한 주요 SQL 쿼리
        self.main_sql = main_sql
        
        # 출력 유효성을 검사하기 위한 SQL 쿼리 및 최소 레코드 수로 구성된 목록
        self.output_check = output_check
        
        # True인 경우 기존 테이블을 덮어쓰고, False인 경우 덮어쓰지 않고 추가합니다.
        self.overwrite = overwrite
        
        # PythonOperator에 전달할 추가 매개변수
        self.params = params
        
        # main_sql 실행 전에 실행할 SQL 쿼리
        self.pre_sql = pre_sql if pre_sql else ""
        
        # main_sql 실행 후에 실행할 SQL 쿼리
        self.after_sql = after_sql.format(schema=self.schema, table=self.table) if after_sql else ""
        
        # 생성할 테이블의 추가 속성
        self.attributes = attributes

        # temp 테이블 생성 및 데이터 적재에 사용될 SQL 문 생성
        if pre_sql:
            self.main_sql = pre_sql
            if not self.main_sql.endswith(";"):
                self.main_sql += ";"
        else:
            self.main_sql = ""
        # 임시테이블이 있으면 삭제, 
        self.main_sql += "DROP TABLE IF EXISTS {schema}.temp_{table};".format(
            schema=self.schema,
            table=self.table
        )
        # CREATE TABLE 문을 만들어서 self.main_sql에 추가합니다.
        self.main_sql += "CREATE TABLE {schema}.temp_{table} {attributes} AS ".format(
            schema=self.schema,
            table=self.table,
            attributes=self.attributes
        ) + self.main_sql

        # 상위 클래스인 PythonOperator를 호출하여 초기화
        # RedshiftSummaryOperator 클래스의 초기화 메소드에서는 두 번의 상위 클래스 초기화(super)가 이루어집니다.

        # 첫 번째 super 호출:
        super(RedshiftSummaryOperator, self).__init__(
            python_callable=redshift_sql_function,  # Python callable로 사용될 함수
            params={
                "sql": self.main_sql,  # 생성한 SQL 문
                "overwrite": self.overwrite,  # 덮어쓰기 여부
                "redshift_conn_id": self.redshift_conn_id  # Redshift 연결 ID
            },
            provide_context=True,  # Airflow 컨텍스트 제공 여부
            *args,
            **kwargs
        )

        # 두 번째 super 호출:
        # after_sql이 정의되어 있다면 해당 값을 사용하고, 그렇지 않으면 빈 문자열("")을 사용합니다.
        if after_sql:
            self.after_sql = after_sql.format(
                schema=self.schema,
                table=self.table
            )
        else:
            self.after_sql = ""

        super(RedshiftSummaryOperator, self).__init__(
            python_callable=redshift_sql_function,  # Python callable로 사용될 함수
            params={
                "sql": main_sql,  # 생성한 SQL 문
                "overwrite": overwrite,  # 덮어쓰기 여부
                "redshift_conn_id": self.redshift_conn_id  # Redshift 연결 ID
            },
            provide_context=True,  # Airflow 컨텍스트 제공 여부
            *args,
            **kwargs
        )


    # temp 테이블과 본 테이블을 스왑하는 함수
    def swap(self):
        # 원본 테이블 삭제
        # 임시테이블 원본테이블 이름으로 바꿈
        # {schema}와 {table}을 사용자가 정의한 값으로 대체하여 
        # SELECT 권한을 부여하는 SQL 문
        sql = """BEGIN;
        DROP TABLE IF EXISTS {schema}.{table} CASCADE;
        ALTER TABLE {schema}.temp_{table} RENAME TO {table};   
        GRANT SELECT ON TABLE {schema}.{table} TO GROUP analytics_users;
        END
        """.format(schema=self.schema,table=self.table)
        self.hook.run(sql, True)

    def execute(self, context):
        """
        RedshiftSummaryOperator의 execute 메소드입니다.

        1. Input_check 먼저 수행
           - input_check는 "sql"과 "count"를 포함하는 딕셔너리의 목록이어야 함
        """
        self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        for item in self.input_check:
            (cnt,) = self.hook.get_first(item["sql"])
            if cnt < item["count"]:
                raise AirflowException(
                    "Input Validation Failed for " + str(item["sql"]))

        """
        2. temp 테이블 생성 및 데이터 적재 수행
        """
        return_value = super(RedshiftSummaryOperator, self).execute(context)

        """
        3. Output_check은 self.output_check 사용
        """
        for item in self.output_check:
            (cnt,) = self.hook.get_first(item["sql"].format(schema=self.schema, table=self.table))
            if item.get("op") == 'eq':
                if int(cnt) != int(item["count"]):
                    raise AirflowException(
                        "Output Validation of 'eq' Failed for " + str(item["sql"]) + ": " + str(cnt) + " vs. " + str(item["count"])
                    )
            else:
                if cnt < item["count"]:
                    raise AirflowException(
                        "Output Validation Failed for " + str(item["sql"]) + ": " + str(cnt) + " vs. " + str(item["count"])
                    )
        
        """
        4. 이제 temp 테이블 이름을 스왑
        """
        self.swap()

        """
        5. after_sql이 정의되어 있다면 실행
        """
        if self.after_sql:
            self.hook.run(self.after_sql, True)

        return return_value

실습파일 다운 및 세팅

  1. 실습파일을 다운로드
    weare@DESKTOP-BE1I4GE:~$ git clone https://github.com/learndataeng/learn-airflow.git
    Cloning into 'learn-airflow'...
    remote: Enumerating objects: 219, done.
    remote: Counting objects: 100% (95/95), done.
    remote: Compressing objects: 100% (93/93), done.
    remote: Total 219 (delta 31), reused 2 (delta 0), pack-reused 124
    Receiving objects: 100% (219/219), 71.07 KiB | 6.46 MiB/s, done.
    Resolving deltas: 100% (106/106), done.

  2. 다운 받은 파일로 이동
    weare@DESKTOP-BE1I4GE:~$ cd learn-airflow

  3. docker-compose.yaml 파일 다운
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100 10493  100 10493    0     0  31703      0 --:--:-- --:--:-- --:--:-- 31796

  4. 필요한 폴더 만듦
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ mkdir -p ./dags ./logs ./plugins

  5. .env 파일을 생성하고, 그 내용으로 AIRFLOW_UID와 AIRFLOW_GID 변수를 설정
    weare@DESKTOP-BE1I4GE:~/learn-airflow$  echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  6. docker-compose.yaml 파일 수정
    5가지를 수정해야하고, 수정내용은 아래와 같다.

    1) environment 에 Airflow에서 사용하는 데이터 디렉토리를 /opt/airflow/data로 설정
     AIRFLOW_VAR_DATA_DIR: /opt/airflow/data


    2) 처음 설치 모듈들 추가 
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- yfinance pandas numpy oauth2client gspread}

    3) 볼륨 설정. 호스트의 ${AIRFLOW_PROJ_DIR:-.}/data 경로를 컨테이너 내부의 /opt/airflow/data 경로로 마운트
    - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data

    4) mkdir~ :  /sources/data 폴더 추가

    5) chown~: AIRFLOW_UID에 지정된 사용자 ID를 소유자로 설정하고, 0은 그룹을 root 그룹으로 설정.
    data 폴더에 대해 명령어를 수행

  7. 컨테이너 실행, 맨뒤에 -d 넣으면 서버 실행돼서 우분투 하나더 실행 안해도 됨
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ docker-compose up -d

  8. docker 데스크탑으로 확인하면 잘 실행됨
  9. http://localhost:8080/ 페이지 가서 variable에 가보면 저장한 변수가 표면적으로 나와있지 않음(보안상 이점)
  10. variable 저장된 data_dir이 저장된 위치 확인.
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ docker ps
    CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                      PORTS                    NAMES
    73c2ebf8ecec   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (healthy)     8080/tcp                 learn-airflow_airflow-triggerer_1
    90cb7b11d245   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (unhealthy)   8080/tcp                 learn-airflow_airflow-scheduler_1
    a5bacafc0519   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (healthy)     8080/tcp                 learn-airflow_airflow-worker_1
    065f9ee9e619   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (healthy)     0.0.0.0:8080->8080/tcp   learn-airflow_airflow-webserver_1
    cd5a97a54a1c   redis:latest           "docker-entrypoint.s…"   29 minutes ago   Up 12 minutes (healthy)     6379/tcp                 learn-airflow_redis_1
    73ed8fe8f6d1   postgres:13            "docker-entrypoint.s…"   29 minutes ago   Up 12 minutes (healthy)     5432/tcp                 learn-airflow_postgres_1

  11. 웹에서는 안보이지만 variable이 존재함을 확인
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ docker exec -it 90cb7b11d245 airflow variables get DATA_DIR
    /home/airflow/.local/lib/python3.7/site-packages/airflow/models/base.py:49 MovedIn20Warning: [31mDeprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. [32mTo prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". [36mSet environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message.[0m (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
    /opt/airflow/data

 

실습파일 다운 및 세팅

  1. 실습파일을 다운로드
    weare@DESKTOP-BE1I4GE:~$ git clone https://github.com/learndataeng/learn-airflow.git
    Cloning into 'learn-airflow'...
    remote: Enumerating objects: 219, done.
    remote: Counting objects: 100% (95/95), done.
    remote: Compressing objects: 100% (93/93), done.
    remote: Total 219 (delta 31), reused 2 (delta 0), pack-reused 124
    Receiving objects: 100% (219/219), 71.07 KiB | 6.46 MiB/s, done.
    Resolving deltas: 100% (106/106), done.

  2. 다운 받은 파일로 이동
    weare@DESKTOP-BE1I4GE:~$ cd learn-airflow

  3. docker-compose.yaml 파일 다운
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100 10493  100 10493    0     0  31703      0 --:--:-- --:--:-- --:--:-- 31796

  4. 필요한 폴더 만듦
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ mkdir -p ./dags ./logs ./plugins

  5. .env 파일을 생성하고, 그 내용으로 AIRFLOW_UID와 AIRFLOW_GID 변수를 설정
    weare@DESKTOP-BE1I4GE:~/learn-airflow$  echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  6. docker-compose.yaml 파일 수정
    5가지를 수정해야하고, 수정내용은 아래와 같다.

    1) environment 에 Airflow에서 사용하는 데이터 디렉토리를 /opt/airflow/data로 설정
     AIRFLOW_VAR_DATA_DIR: /opt/airflow/data


    2) 처음 설치 모듈들 추가 
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- yfinance pandas numpy oauth2client gspread}

    3) 볼륨 설정. 호스트의 ${AIRFLOW_PROJ_DIR:-.}/data 경로를 컨테이너 내부의 /opt/airflow/data 경로로 마운트
    - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data

    4) mkdir~ :  /sources/data 폴더 추가

    5) chown~: AIRFLOW_UID에 지정된 사용자 ID를 소유자로 설정하고, 0은 그룹을 root 그룹으로 설정.
    data 폴더에 대해 명령어를 수행

  7. 컨테이너 실행, 맨뒤에 -d 넣으면 서버 실행돼서 우분투 하나더 실행 안해도 됨
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ docker-compose up -d

  8. docker 데스크탑으로 확인하면 잘 실행됨
  9. 저장된 data_dir이 저장된 위치 확인.
    weare@DESKTOP-BE1I4GE:~/learn-airflow$ docker ps
    CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                      PORTS                    NAMES
    73c2ebf8ecec   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (healthy)     8080/tcp                 learn-airflow_airflow-triggerer_1
    90cb7b11d245   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (unhealthy)   8080/tcp                 learn-airflow_airflow-scheduler_1
    a5bacafc0519   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (healthy)     8080/tcp                 learn-airflow_airflow-worker_1
    065f9ee9e619   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   10 minutes ago   Up 10 minutes (healthy)     0.0.0.0:8080->8080/tcp   learn-airflow_airflow-webserver_1
    cd5a97a54a1c   redis:latest           "docker-entrypoint.s…"   29 minutes ago   Up 12 minutes (healthy)     6379/tcp                 learn-airflow_redis_1
    73ed8fe8f6d1   postgres:13            "docker-entrypoint.s…"   29 minutes ago   Up 12 minutes (healthy)     5432/tcp                 learn-airflow_postgres_1

    weare@DESKTOP-BE1I4GE:~/learn-airflow$ docker exec -it 90cb7b11d245 airflow variables get DATA_DIR
    /home/airflow/.local/lib/python3.7/site-packages/airflow/models/base.py:49 MovedIn20Warning: [31mDeprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. [32mTo prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". [36mSet environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message.[0m (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
    /opt/airflow/data

airflow 실행환경 관리

  • 기타 환경설정값들(variables, connections 등) 어떻게 관리/배포할까?
    보통 docker-compose.yml 파일에서 관리
  • 어디까지 airflow 이미지로 관리하고 뭘 docker-compose.yml에서 관리할까?

  • 두가지 실행환경 : 에어플로우(Apache Airflow) 이미지, Docker Compose

    1. **에어플로우 이미지:**
       - **개념:** 에어플로우 이미지는 에어플로우 및 관련 의존성, DAGs, 설정 파일 등이 미리 패키징된 Docker 이미지입니다. 이 이미지는 에어플로우를 실행하는 데 필요한 모든 것을 포함합니다.
       - **용도:** 이미지는 여러 환경 (개발, 테스트, 프로덕션)에서 일관된 에어플로우 환경을 유지하고, 배포를 단순화하는 데 사용됩니다.

    2. **Docker Compose:**
       - **개념:** Docker Compose는 여러 컨테이너를 정의하고 관리하는 도구로, YAML 파일에 컨테이너 설정을 기술합니다. 이를 사용하여 여러 서비스를 하나의 프로젝트로 정의하고 실행할 수 있습니다.
       - **용도:** `docker-compose.yml` 파일은 여러 컨테이너를 동시에 실행하고, 서비스 간의 의존성, 네트워킹, 볼륨 마운트 등을 설정하는 데 사용됩니다. 즉, 여러 서비스를 함께 실행하고 관리하는 데 도움이 됩니다.

    **차이점:**
    - **에어플로우 이미지:** 에어플로우와 관련된 모든 것을 패키징한 이미지. 에어플로우 자체, DAGs, 설정 파일, 의존성 등이 이미지에 포함됨.
    - **Docker Compose:** 여러 컨테이너를 정의하고 관리하는 도구. 여러 서비스를 정의하고 이를 함께 실행할 때 사용. 컨테이너의 구성, 네트워킹, 데이터 볼륨 등을 관리.

    일반적으로, 에어플로우 이미지는 에어플로우 환경을 패키징하고, Docker Compose는 여러 서비스를 함께 실행하고 이를 관리하는 데 사용됩니다. `docker-compose.yml` 파일에서는 에어플로우 이미지를 어떻게 사용할지, 각 컨테이너 간의 상호작용을 어떻게 설정할지 등을 정의합니다.

  • 조심할점 : airfow의 dag 스캔패턴의 특성상 dags_folder가 가리키는 폴더를 서브폴더들까지 다 스캔함. dag모듈이 포함된 모든 파이썬 스크립트를 실행해서 사고로 이어진다. 

    해결방법 : .airflowignore로 무시해야할 dag_folder의 폴더나 파일을 지정

    실제예
    1. `project_a`: 이 패턴은 단순히 `project_a`라는 파일 또는 디렉토리를 무시하는 패턴입니다.

    2. `tenant_[\d]`: 이 패턴은 `tenant_` 다음에 숫자(`[\d]`는 숫자를 나타냄)가 따라오는 경우를 의미합니다. 이것은 `tenant_0`, `tenant_1`, `tenant_2` 등과 같은 패턴을 무시하는 데 사용될 수 있습니다.

 

 

 

 

 

"name '_mysql' is not defined"

  • 원인 : 에어플로우(Airflow)에서 "name '_mysql' is not defined" 오류는 MySQLDB 모듈의 _mysql 부분이 로드되지 않아 발생하는 문제입니다. 이 오류는 주로 MySQLDB 패키지의 라이브러리 로딩이나 환경 변수 설정의 문제로 인해 발생할 수 있습니다.
  • 해결방법 : 오류를 해결하기 위해 LD_PRELOAD 환경 변수를 사용하여 특정 C++ 표준 라이브러리를 명시적으로 로드하는 것으로 해당 모듈을 찾을 수 있게 합니다.

  • 방법1
    1) libstdc++.so.6 위치를 찾는다.
    (airflow)find / -name libstdc++.so.6
    /usr/lib/x86_64-linux-gnu/libstdc++.so.6

    2) LD_PRELOAD 환경변수에 해당 라이브러리를 추가하여, 프로그램을 실행할 때, 라이브러리를 먼저 로드하게함
    (airflow)export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libstdc++.so.6

  • 방법2
    airflow web ui에서 connection 설정 Extra에
    {“conn_type”: “mysql”, “client”: “mysql-connector-python”}
    or {“conn_type”: “mysql”, “client”: “pymysql”} 
    넣어서 저장해준다.(둘 중에 되는거로)

backfill을 할수 있는 조건

  • 모든 dag가 backfill을 필요로 하지 않는다.(full refresh는 필요없음)
  • 일별 시간별 업데이트 할때만 backfill이 필요하다.
  • 데이터 소스에 그날 바뀌거나 새로생긴 레코드를 구분할수 있는 기능이 있어야함
  • 데이터 크기가 커지면 backfill 기능을 필수적으로 구현


daily incremental dag에서 2018년 7월달 데이터를 다시 읽어와야 한다면

  1. 하루씩 지금까지 실행?
  2. 한번에 여러 날짜를 동시에 실행 , max_active_runs
  3. 예상되는 문제
    1) 데이터를 요구받는 소스측에서 감당못해 속도를 줄일수 있음 
    2) 동시 실행시 충돌이 날수도 있음
  4. backfill 실행전 준비사항
    1) catchup=True
    2) execution_date 사용해서 incremental update가 구현돼있음.
  5. 실행순서
    1) 실행순서는 날짜/시간순 아니고 랜덤.
    2) 날짜순으로 하고 싶으면 
  6. 커맨드라인에서 실행
    airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01

목표 : mysql 테이블 redshift로 복사하기

  • 첫번째 방법(insert) : mysql -> airflow server(local)->redshift
    +용량이 증가하면 insert commend가 오래걸림
    + 준비할것 : mysql->airflow 연결, airflow-> redshift 권한설정
  • 두번째 방법(copy) : mysql -> airflow server(local)->amazon s3-> redshift
    +준비할것 : mysql->airflow 연결 ,airflow->s3 권한설정 , s3 -> redshift 권한설정

 

s3 버킷,mysql 서버 구현은 따로 진행할것(중요정보라 밝히지 못함)

 

aws 권한설정

  • airflow dag에서 s3 접근(쓰기권한)
    iam user를 만들고 s3 버킷에 대한 읽기/쓰기 권한 설정, access key와 secret key를 사용한다.
  • redshift가 s3 접근(읽기권한)
    redshift에 s3를 접근할 수 있는 역할(role)을 만들고 이를 redshift에 지정

airflow에서 mysql 연결 설정

 

 

MySQLdb 에러발생시 https://allofdater.tistory.com/59 참조

aws s3 접근 방법 개요

  • iam을 사용해 별도의 사용자를 만들고
  • 그 사용자에게 해당 s3 bucket을 읽고 쓸 수 있는 권한을 제공(특정 s3의 권한만 주는게 좋음)
  • custom policy 내용 아래로 수정
  • 그 사용자의 access key id와 secret access key를 사용(두개 따로 잘 보관해놀것, 재발급X)
  • airflow에서 s3 연결 설정

ap-northeast-2는 서울을 의미(다른 나라면 바꿀것)

 

mysql (이미 제작된 실습) 테이블(oltp, production database)

 

redshift (oltp, data warehouse)에 해당 테이블생성

 

새롭게 사용할 operator 

  • SqlToS3Operator : MySQL SQL 결과 -> S3
    (s3://grepp-data-engineering/{본인ID}-nps)
    s3://s3_bucket/s3_key

  • S3ToRedshiftOperator : S3 -> Redshift 테이블
    (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)
    COPY command is used

 

airflow를 활용한 전체 실습 과정

 

 

 full refrash 버전 코드

# 필요한 Airflow 모듈 및 클래스 가져오기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

# 필요한 Python 표준 라이브러리 가져오기
from datetime import datetime
from datetime import timedelta

# DAG 정의
dag = DAG(
    dag_id='MySQL_to_Redshift',  # DAG의 고유 식별자
    start_date=datetime(2022, 8, 24),  # DAG 실행 시작일
    schedule='0 9 * * *',  # DAG의 스케줄 (UTC 기준, 매일 9시에 실행)
    max_active_runs=1,  # 최대 동시 실행 횟수
    catchup=False,  # 과거 실행 여부
    default_args={
        'retries': 1,  # 작업 재시도 횟수
        'retry_delay': timedelta(minutes=3),  # 작업 재시도 간격
    }
)

# Redshift 테이블 및 S3 버킷 관련 설정
schema = "본인id"  # Redshift 스키마
table = "nps"  # Redshift 테이블명
s3_bucket = "grepp-data-engineering"  # S3 버킷명
s3_key = schema + "-" + table  # S3 키 (파일 경로)

# MySQL 데이터를 S3로 이동하는 작업 정의
mysql_to_s3_nps = SqlToS3Operator(
    task_id='mysql_to_s3_nps',  # 작업 고유 식별자
    query="SELECT * FROM prod.nps",  # MySQL에서 데이터를 가져오는 쿼리
    s3_bucket=s3_bucket,  # 이동할 S3 버킷
    s3_key=s3_key,  # 이동할 S3 키 (파일 경로)
    sql_conn_id="mysql_conn_id",  # MySQL 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID
    verify=False,  # SSL 인증서의 검증을 시행할지 여부
    replace=True,  # S3에 저장된 파일이 이미 존재할 경우 해당 파일을 교체(replace)할지 여부
    pd_kwargs={"index": False, "header": False},   # DataFrame을 CSV로 변환할 때 인덱스 및 헤더의 포함 여부
    dag=dag  # DAG 연결
)

# S3에서 Redshift로 데이터를 이동하는 작업 정의
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id='s3_to_redshift_nps',  # 작업 고유 식별자
    s3_bucket=s3_bucket,  # 읽어올 S3 버킷
    s3_key=s3_key,  # 읽어올 S3 키 (파일 경로)
    schema=schema,  # Redshift 테이블의 스키마
    table=table,  # Redshift 테이블명
    copy_options=['csv'],  # COPY 명령에 사용되는 옵션

    method='REPLACE',  # 데이터 이동 방법 (REPLACE: 기존 데이터 대체)
    redshift_conn_id="redshift_dev_db",  # Redshift 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID
    dag=dag  # DAG 연결
)

# 작업 간의 의존성 정의
mysql_to_s3_nps >> s3_to_redshift_nps

 

incremental update 방식

  • 테이블이 다음을 만족해야함 :  만들거나 생성될때 time이 기록돼야함 ,
    만들때 : created(timestamp) , modified(timestamp)
    수정할때 : modified(timestamp)
    삭제할때 : deleted(boolean) 가 true로 바뀌어야함,  (deleted가 True면 동작)modified(timestamp)

  • row_number로 구현하는경우
    예) daily update 이고, A인 table을 mysql에서 읽어올때, row_number로 구현
    1) redshift의 A테이블의 내용을 temp_A로 복사
    2) A테이블의 레코드 중 modified의 날짜가 지난일(execution_date)에 해당하는 모든 레코드를 temp_A로 복사
    select * from A where date(modified) = date(execution_date)
    3)  temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 desc 정렬해서, 일렬번호가 1인 것들만 다시 A로 복사
  • S3ToRedshiftOperator로 구현하는 경우
    1) SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    2) method 파라미터로 “UPSERT”를 지정
    3) upsert_keys 파라미터로 Primary key를 지정(앞서 nps 테이블이라면 id필드를 사용)

incremental update  코드

# 필요한 Airflow 모듈 및 클래스 가져오기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

# 필요한 Python 표준 라이브러리 가져오기
from datetime import datetime
from datetime import timedelta

# DAG 정의
dag = DAG(
    dag_id='MySQL_to_Redshift_v2',  # DAG의 고유 식별자
    start_date=datetime(2023, 1, 1),  # DAG 실행 시작일
    schedule='0 9 * * *',  # DAG의 스케줄 (UTC 기준, 매일 9시에 실행)
    max_active_runs=1,  # 최대 동시 실행 횟수
    catchup=False,  # 과거 실행 여부
    default_args={
        'retries': 1,  # 작업 재시도 횟수
        'retry_delay': timedelta(minutes=3),  # 작업 재시도 간격
    }
)

# Redshift 테이블, S3 버킷, S3 키 관련 설정
schema = "본인id"  # Redshift 스키마
table = "nps"  # Redshift 테이블명
s3_bucket = "grepp-data-engineering"  # S3 버킷명
s3_key = schema + "-" + table  # S3 키 (파일 경로)
# s3_key = schema + "/" + table  # S3 키 (다른 형식의 예시)

# MySQL 쿼리 정의
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"

# MySQL 데이터를 S3로 이동하는 작업 정의
mysql_to_s3_nps = SqlToS3Operator(
    task_id='mysql_to_s3_nps',  # 작업 고유 식별자
    query=sql,  # MySQL에서 데이터를 가져오는 쿼리
    s3_bucket=s3_bucket,  # 이동할 S3 버킷
    s3_key=s3_key,  # 이동할 S3 키 (파일 경로)
    sql_conn_id="mysql_conn_id",  # MySQL 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID
    verify=False,  # SSL 검증 여부
    replace=True,  # S3 파일 교체 여부
    pd_kwargs={"index": False, "header": False},  # pandas DataFrame 설정
    dag=dag  # DAG 연결
)

# S3에서 Redshift로 데이터를 이동하는 작업 정의
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id='s3_to_redshift_nps',  # 작업 고유 식별자
    s3_bucket=s3_bucket,  # 읽어올 S3 버킷
    s3_key=s3_key,  # 읽어올 S3 키 (파일 경로)
    schema=schema,  # Redshift 테이블의 스키마
    table=table,  # Redshift 테이블명
    copy_options=['csv'],  # COPY 명령에 사용되는 옵션
    redshift_conn_id="redshift_dev_db",  # Redshift 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID

    method="UPSERT",  # 데이터 이동 방법 (UPSERT: 삽입 또는 갱신)
    upsert_keys=["id"],  # UPSERT 작업에 사용될 키
    dag=dag  # DAG 연결
)

# 작업 간의 의존성 정의
mysql_to_s3_nps >> s3_to_redshift_nps


redshift에게 s3 버켓에 대한 액세스 권한 지정(링크 참조)
https://docs.google.com/document/d/1FArSdUmDWHM9zbgEWtmYSJnxPXDX-LB7HT33AYJlWIA/edit

 

Redshift cluster 설정

 

docs.google.com

 

실습파일 본인 id로 수정(두개다 정상 실행완료)ㅇ

  • weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim MySQL_to_Redshift_v2.py

 

'airflow(에어플로우)' 카테고리의 다른 글

Airflow에서 _mysql is not defined 에러  (0) 2024.01.03
airflow에서 backfill 실행  (0) 2024.01.03
No module named MySQLdb 에러  (0) 2024.01.03
airflow에서 primary key 방법2  (0) 2024.01.03
backfill과 airflow  (0) 2024.01.02

1. 컨테이너 종류 확인
weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED        STATUS                  PORTS                                       NAMES
13c7de52342d   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   8080/tcp                                    airflow-setup_airflow-triggerer_1
8e40ffd79576   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   airflow-setup_airflow-webserver_1
c57811bc9ba1   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   8080/tcp                                    airflow-setup_airflow-worker_1
f14cc34e94be   apache/airflow:2.5.1   "/usr/bin/dumb-init …"   35 hours ago   Up 19 hours (healthy)   8080/tcp                                    airflow-setup_airflow-scheduler_1
48ee152ae746   redis:latest           "docker-entrypoint.s…"   4 days ago     Up 19 hours (healthy)   6379/tcp                                    airflow-setup_redis_1
a5e756836619   postgres:13            "docker-entrypoint.s…"   4 days ago     Up 19 hours (healthy)   5432/tcp                                    airflow-setup_postgres_1

2. 스케줄러 shell root 권한으로 접속
weare@DESKTOP-BE1I4GE:~/airflow-setup$ docker exec --user root -it f14cc34e94be sh
Get:1 http://deb.debian.org/debian bullseye InRelease [116 kB]
Get:2 http://deb.debian.org/debian-security bullseye-security InRelease [48.4 kB]
Get:3 http://deb.debian.org/debian bullseye-updates InRelease [44.1 kB]
Get:4 http://deb.debian.org/debian bullseye/main amd64 Packages [8062 kB]
Get:5 https://packages.microsoft.com/debian/11/prod bullseye InRelease [3649 B]
Get:6 https://packages.microsoft.com/debian/11/prod bullseye/main amd64 Packages [131 kB]
Get:7 http://deb.debian.org/debian-security bullseye-security/main amd64 Packages [264 kB]
Get:8 http://deb.debian.org/debian bullseye-updates/main amd64 Packages [18.8 kB]
Get:9 https://packages.microsoft.com/debian/11/prod bullseye/main arm64 Packages [27.5 kB]
Get:10 https://packages.microsoft.com/debian/11/prod bullseye/main armhf Packages [24.9 kB]
Get:11 https://packages.microsoft.com/debian/11/prod bullseye/main all Packages [1214 B]
Get:12 https://apt.postgresql.org/pub/repos/apt bullseye-pgdg InRelease [123 kB]
Get:13 https://apt.postgresql.org/pub/repos/apt bullseye-pgdg/main amd64 Packages [306 kB]
Fetched 9170 kB in 5s (1760 kB/s)
Reading package lists... Done


3. MySQL을 사용하는 Apache Airflow에서 필요한 패키지 및 라이브러리를 설치.

 

  • apt-get은 리눅스 시스템에서 패키지 관리를 위해 사용되며, update명령어는 패키지 목록을 최신으로 갱신
    (airflow)sudo apt-get update

  • MySQL 데이터베이스와 상호 작용하는 C 언어 클라이언트 라이브러리를 개발하기 위해 필요한 헤더 파일 및 라이브러리를 포함하는 패키지인 default-libmysqlclient-dev를 설치
    (airflow)sudo apt-get install -y default-libmysqlclient-dev
    Reading package lists... Done
    Building dependency tree... Done
    Reading state information... Done
    E: Unable to locate package defauld-libmysqlclient-dev

  • gcc는 GNU Compiler Collection의 일부로, C 및 C++ 등의 프로그래밍 언어를 컴파일하는 데 사용되는 컴파일러 설치
    (airflow)sudo apt-get install -y gcc
    Reading package lists... Done
    Building dependency tree... Done
    Reading state information... Done
    gcc is already the newest version (4:10.2.1-1).
    0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.

  • Apache Airflow에서 MySQL을 사용할 수 있게 하는 공식 제공자 패키지 apache-airflow-providers-mysql를 설치. pip3는 파이썬 패키지 설치 도구이며, --ignore-installed 플래그는 이미 설치된 패키지를 무시하고 강제로 새로운 버전 설치
    (airflow)sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
    Collecting apache-airflow-providers-mysql
      Downloading apache_airflow_providers_mysql-5.1.0-py3-none-any.whl.metadata (16 kB)
    Collecting apache-airflow-providers-common-sql>=1.3.1 (from apache-airflow-providers-mysql)
      Downloading apache_airflow_providers_common_sql-1.5.1-py3-none-any.whl.metadata (10 kB)
    Collecting apache-airflow>=2.4.0 (from apache-airflow-providers-mysql)

 

disinct 방식


disinct 방식의 허점

  • 모든 데이터가 동일하면 새 데이터로 대체하는 방식인데, 주식데이터는 마감시간이 아니면 종가,거래량은 달라진다.
  • row_number 방식으로 동일 레코드를 처리해야한다.

코드

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol)
    data = ticket.history()
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records


def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint,
    created_date timestamp DEFAULT GETDATE()
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
        _create_table(cur, schema, table, False)
        # 임시 테이블로 원본 테이블을 복사
        create_t_sql = f"""CREATE TEMP TABLE t (LIKE {schema}.{table} INCLUDING DEFAULTS);
            INSERT INTO t SELECT * FROM {schema}.{table};"""
        cur.execute(create_t_sql)
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # 임시 테이블 내용을 원본 테이블로 복사
        cur.execute(f"DELETE FROM {schema}.{table};")
        cur.execute(f"""INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
    FROM t
)
WHERE seq = 1;""")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol_v3',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *'
) as dag:

    results = get_historical_prices("AAPL")
    load("본인id", "stock_info_v3", results)

 

실습파일 다운 후 수정

  • 실습파일 다운
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ gdown --id 18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY -O UpdateSymbol_v3.py
    /home/weare/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: Option `--id` was deprecated in version 4.3.1 and will be removed in 5.0. You don't need to pass it anymore to use a file ID.
      warnings.warn(
    Downloading...
    From (uriginal): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY
    From (redirected): https://drive.google.com/uc?id=18NThnGJNLCJkXo1WQx5GzyVHpkqUFvTY&confirm=t&uuid=963be733-82a4-418f-83d2-2530c4d638ab
    To: /home/weare/airflow-setup/dags/UpdateSymbol_v3.py
    100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2.62k/2.62k [00:00<00:00, 17.3MB/s]

  • 실습파일 수정(본인의 id로 대체)
    weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim UpdateSymbol_v3.py

 

정상실행 완료

 

+ Recent posts