"name '_mysql' is not defined"

  • 원인 : 에어플로우(Airflow)에서 "name '_mysql' is not defined" 오류는 MySQLDB 모듈의 _mysql 부분이 로드되지 않아 발생하는 문제입니다. 이 오류는 주로 MySQLDB 패키지의 라이브러리 로딩이나 환경 변수 설정의 문제로 인해 발생할 수 있습니다.
  • 해결방법 : 오류를 해결하기 위해 LD_PRELOAD 환경 변수를 사용하여 특정 C++ 표준 라이브러리를 명시적으로 로드하는 것으로 해당 모듈을 찾을 수 있게 합니다.

  • 방법1
    1) libstdc++.so.6 위치를 찾는다.
    (airflow)find / -name libstdc++.so.6
    /usr/lib/x86_64-linux-gnu/libstdc++.so.6

    2) LD_PRELOAD 환경변수에 해당 라이브러리를 추가하여, 프로그램을 실행할 때, 라이브러리를 먼저 로드하게함
    (airflow)export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libstdc++.so.6

  • 방법2
    airflow web ui에서 connection 설정 Extra에
    {“conn_type”: “mysql”, “client”: “mysql-connector-python”}
    or {“conn_type”: “mysql”, “client”: “pymysql”} 
    넣어서 저장해준다.(둘 중에 되는거로)

backfill을 할수 있는 조건

  • 모든 dag가 backfill을 필요로 하지 않는다.(full refresh는 필요없음)
  • 일별 시간별 업데이트 할때만 backfill이 필요하다.
  • 데이터 소스에 그날 바뀌거나 새로생긴 레코드를 구분할수 있는 기능이 있어야함
  • 데이터 크기가 커지면 backfill 기능을 필수적으로 구현


daily incremental dag에서 2018년 7월달 데이터를 다시 읽어와야 한다면

  1. 하루씩 지금까지 실행?
  2. 한번에 여러 날짜를 동시에 실행 , max_active_runs
  3. 예상되는 문제
    1) 데이터를 요구받는 소스측에서 감당못해 속도를 줄일수 있음 
    2) 동시 실행시 충돌이 날수도 있음
  4. backfill 실행전 준비사항
    1) catchup=True
    2) execution_date 사용해서 incremental update가 구현돼있음.
  5. 실행순서
    1) 실행순서는 날짜/시간순 아니고 랜덤.
    2) 날짜순으로 하고 싶으면 
  6. 커맨드라인에서 실행
    airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01

목표 : mysql 테이블 redshift로 복사하기

  • 첫번째 방법(insert) : mysql -> airflow server(local)->redshift
    +용량이 증가하면 insert commend가 오래걸림
    + 준비할것 : mysql->airflow 연결, airflow-> redshift 권한설정
  • 두번째 방법(copy) : mysql -> airflow server(local)->amazon s3-> redshift
    +준비할것 : mysql->airflow 연결 ,airflow->s3 권한설정 , s3 -> redshift 권한설정

 

s3 버킷,mysql 서버 구현은 따로 진행할것(중요정보라 밝히지 못함)

 

aws 권한설정

  • airflow dag에서 s3 접근(쓰기권한)
    iam user를 만들고 s3 버킷에 대한 읽기/쓰기 권한 설정, access key와 secret key를 사용한다.
  • redshift가 s3 접근(읽기권한)
    redshift에 s3를 접근할 수 있는 역할(role)을 만들고 이를 redshift에 지정

airflow에서 mysql 연결 설정

 

 

MySQLdb 에러발생시 https://allofdater.tistory.com/59 참조

aws s3 접근 방법 개요

  • iam을 사용해 별도의 사용자를 만들고
  • 그 사용자에게 해당 s3 bucket을 읽고 쓸 수 있는 권한을 제공(특정 s3의 권한만 주는게 좋음)
  • custom policy 내용 아래로 수정
  • 그 사용자의 access key id와 secret access key를 사용(두개 따로 잘 보관해놀것, 재발급X)
  • airflow에서 s3 연결 설정

ap-northeast-2는 서울을 의미(다른 나라면 바꿀것)

 

mysql (이미 제작된 실습) 테이블(oltp, production database)

 

redshift (oltp, data warehouse)에 해당 테이블생성

 

새롭게 사용할 operator 

  • SqlToS3Operator : MySQL SQL 결과 -> S3
    (s3://grepp-data-engineering/{본인ID}-nps)
    s3://s3_bucket/s3_key

  • S3ToRedshiftOperator : S3 -> Redshift 테이블
    (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)
    COPY command is used

 

airflow를 활용한 전체 실습 과정

 

 

 full refrash 버전 코드

# 필요한 Airflow 모듈 및 클래스 가져오기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

# 필요한 Python 표준 라이브러리 가져오기
from datetime import datetime
from datetime import timedelta

# DAG 정의
dag = DAG(
    dag_id='MySQL_to_Redshift',  # DAG의 고유 식별자
    start_date=datetime(2022, 8, 24),  # DAG 실행 시작일
    schedule='0 9 * * *',  # DAG의 스케줄 (UTC 기준, 매일 9시에 실행)
    max_active_runs=1,  # 최대 동시 실행 횟수
    catchup=False,  # 과거 실행 여부
    default_args={
        'retries': 1,  # 작업 재시도 횟수
        'retry_delay': timedelta(minutes=3),  # 작업 재시도 간격
    }
)

# Redshift 테이블 및 S3 버킷 관련 설정
schema = "본인id"  # Redshift 스키마
table = "nps"  # Redshift 테이블명
s3_bucket = "grepp-data-engineering"  # S3 버킷명
s3_key = schema + "-" + table  # S3 키 (파일 경로)

# MySQL 데이터를 S3로 이동하는 작업 정의
mysql_to_s3_nps = SqlToS3Operator(
    task_id='mysql_to_s3_nps',  # 작업 고유 식별자
    query="SELECT * FROM prod.nps",  # MySQL에서 데이터를 가져오는 쿼리
    s3_bucket=s3_bucket,  # 이동할 S3 버킷
    s3_key=s3_key,  # 이동할 S3 키 (파일 경로)
    sql_conn_id="mysql_conn_id",  # MySQL 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID
    verify=False,  # SSL 인증서의 검증을 시행할지 여부
    replace=True,  # S3에 저장된 파일이 이미 존재할 경우 해당 파일을 교체(replace)할지 여부
    pd_kwargs={"index": False, "header": False},   # DataFrame을 CSV로 변환할 때 인덱스 및 헤더의 포함 여부
    dag=dag  # DAG 연결
)

# S3에서 Redshift로 데이터를 이동하는 작업 정의
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id='s3_to_redshift_nps',  # 작업 고유 식별자
    s3_bucket=s3_bucket,  # 읽어올 S3 버킷
    s3_key=s3_key,  # 읽어올 S3 키 (파일 경로)
    schema=schema,  # Redshift 테이블의 스키마
    table=table,  # Redshift 테이블명
    copy_options=['csv'],  # COPY 명령에 사용되는 옵션

    method='REPLACE',  # 데이터 이동 방법 (REPLACE: 기존 데이터 대체)
    redshift_conn_id="redshift_dev_db",  # Redshift 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID
    dag=dag  # DAG 연결
)

# 작업 간의 의존성 정의
mysql_to_s3_nps >> s3_to_redshift_nps

 

incremental update 방식

  • 테이블이 다음을 만족해야함 :  만들거나 생성될때 time이 기록돼야함 ,
    만들때 : created(timestamp) , modified(timestamp)
    수정할때 : modified(timestamp)
    삭제할때 : deleted(boolean) 가 true로 바뀌어야함,  (deleted가 True면 동작)modified(timestamp)

  • row_number로 구현하는경우
    예) daily update 이고, A인 table을 mysql에서 읽어올때, row_number로 구현
    1) redshift의 A테이블의 내용을 temp_A로 복사
    2) A테이블의 레코드 중 modified의 날짜가 지난일(execution_date)에 해당하는 모든 레코드를 temp_A로 복사
    select * from A where date(modified) = date(execution_date)
    3)  temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 desc 정렬해서, 일렬번호가 1인 것들만 다시 A로 복사
  • S3ToRedshiftOperator로 구현하는 경우
    1) SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
    2) method 파라미터로 “UPSERT”를 지정
    3) upsert_keys 파라미터로 Primary key를 지정(앞서 nps 테이블이라면 id필드를 사용)

incremental update  코드

# 필요한 Airflow 모듈 및 클래스 가져오기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

# 필요한 Python 표준 라이브러리 가져오기
from datetime import datetime
from datetime import timedelta

# DAG 정의
dag = DAG(
    dag_id='MySQL_to_Redshift_v2',  # DAG의 고유 식별자
    start_date=datetime(2023, 1, 1),  # DAG 실행 시작일
    schedule='0 9 * * *',  # DAG의 스케줄 (UTC 기준, 매일 9시에 실행)
    max_active_runs=1,  # 최대 동시 실행 횟수
    catchup=False,  # 과거 실행 여부
    default_args={
        'retries': 1,  # 작업 재시도 횟수
        'retry_delay': timedelta(minutes=3),  # 작업 재시도 간격
    }
)

# Redshift 테이블, S3 버킷, S3 키 관련 설정
schema = "본인id"  # Redshift 스키마
table = "nps"  # Redshift 테이블명
s3_bucket = "grepp-data-engineering"  # S3 버킷명
s3_key = schema + "-" + table  # S3 키 (파일 경로)
# s3_key = schema + "/" + table  # S3 키 (다른 형식의 예시)

# MySQL 쿼리 정의
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"

# MySQL 데이터를 S3로 이동하는 작업 정의
mysql_to_s3_nps = SqlToS3Operator(
    task_id='mysql_to_s3_nps',  # 작업 고유 식별자
    query=sql,  # MySQL에서 데이터를 가져오는 쿼리
    s3_bucket=s3_bucket,  # 이동할 S3 버킷
    s3_key=s3_key,  # 이동할 S3 키 (파일 경로)
    sql_conn_id="mysql_conn_id",  # MySQL 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID
    verify=False,  # SSL 검증 여부
    replace=True,  # S3 파일 교체 여부
    pd_kwargs={"index": False, "header": False},  # pandas DataFrame 설정
    dag=dag  # DAG 연결
)

# S3에서 Redshift로 데이터를 이동하는 작업 정의
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id='s3_to_redshift_nps',  # 작업 고유 식별자
    s3_bucket=s3_bucket,  # 읽어올 S3 버킷
    s3_key=s3_key,  # 읽어올 S3 키 (파일 경로)
    schema=schema,  # Redshift 테이블의 스키마
    table=table,  # Redshift 테이블명
    copy_options=['csv'],  # COPY 명령에 사용되는 옵션
    redshift_conn_id="redshift_dev_db",  # Redshift 연결에 사용될 연결 ID
    aws_conn_id="aws_conn_id",  # AWS 연결에 사용될 연결 ID

    method="UPSERT",  # 데이터 이동 방법 (UPSERT: 삽입 또는 갱신)
    upsert_keys=["id"],  # UPSERT 작업에 사용될 키
    dag=dag  # DAG 연결
)

# 작업 간의 의존성 정의
mysql_to_s3_nps >> s3_to_redshift_nps


redshift에게 s3 버켓에 대한 액세스 권한 지정(링크 참조)
https://docs.google.com/document/d/1FArSdUmDWHM9zbgEWtmYSJnxPXDX-LB7HT33AYJlWIA/edit

 

Redshift cluster 설정

 

docs.google.com

 

실습파일 본인 id로 수정(두개다 정상 실행완료)ㅇ

  • weare@DESKTOP-BE1I4GE:~/airflow-setup/dags$ vim MySQL_to_Redshift_v2.py

 

'airflow(에어플로우)' 카테고리의 다른 글

Airflow에서 _mysql is not defined 에러  (0) 2024.01.03
airflow에서 backfill 실행  (0) 2024.01.03
No module named MySQLdb 에러  (0) 2024.01.03
airflow에서 primary key 방법2  (0) 2024.01.03
backfill과 airflow  (0) 2024.01.02

+ Recent posts