원인 : 에어플로우(Airflow)에서 "name '_mysql' is not defined" 오류는 MySQLDB 모듈의 _mysql 부분이 로드되지 않아 발생하는 문제입니다. 이 오류는 주로 MySQLDB 패키지의 라이브러리 로딩이나 환경 변수 설정의 문제로 인해 발생할 수 있습니다.
해결방법 : 오류를 해결하기 위해 LD_PRELOAD 환경 변수를 사용하여 특정 C++ 표준 라이브러리를 명시적으로 로드하는 것으로 해당 모듈을 찾을 수 있게 합니다.
방법1 1) libstdc++.so.6 위치를 찾는다. (airflow)find / -name libstdc++.so.6 /usr/lib/x86_64-linux-gnu/libstdc++.so.6
2) LD_PRELOAD 환경변수에 해당 라이브러리를 추가하여, 프로그램을 실행할 때, 라이브러리를 먼저 로드하게함 (airflow)export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libstdc++.so.6
방법2 airflow web ui에서 connection 설정 Extra에 {“conn_type”: “mysql”, “client”: “mysql-connector-python”} or {“conn_type”: “mysql”, “client”: “pymysql”} 넣어서 저장해준다.(둘 중에 되는거로)
그 사용자에게 해당 s3 bucket을 읽고 쓸 수 있는 권한을 제공(특정 s3의 권한만 주는게 좋음)
custom policy 내용 아래로 수정
그 사용자의 access key id와 secret access key를 사용(두개 따로 잘 보관해놀것, 재발급X)
airflow에서 s3 연결 설정
ap-northeast-2는 서울을 의미(다른 나라면 바꿀것)
mysql (이미 제작된 실습) 테이블(oltp, production database)
redshift (oltp, data warehouse)에 해당 테이블생성
새롭게 사용할 operator
SqlToS3Operator : MySQL SQL 결과 -> S3 (s3://grepp-data-engineering/{본인ID}-nps) s3://s3_bucket/s3_key
S3ToRedshiftOperator : S3 -> Redshift 테이블 (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps) COPY command is used
airflow를 활용한 전체 실습 과정
full refrash 버전 코드
# 필요한 Airflow 모듈 및 클래스 가져오기 from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.models import Variable
# 필요한 Python 표준 라이브러리 가져오기 from datetime import datetime from datetime import timedelta
# DAG 정의 dag = DAG( dag_id='MySQL_to_Redshift', # DAG의 고유 식별자 start_date=datetime(2022, 8, 24), # DAG 실행 시작일 schedule='0 9 * * *', # DAG의 스케줄 (UTC 기준, 매일 9시에 실행) max_active_runs=1, # 최대 동시 실행 횟수 catchup=False, # 과거 실행 여부 default_args={ 'retries': 1, # 작업 재시도 횟수 'retry_delay': timedelta(minutes=3), # 작업 재시도 간격 } )
# Redshift 테이블 및 S3 버킷 관련 설정 schema = "본인id" # Redshift 스키마 table = "nps" # Redshift 테이블명 s3_bucket = "grepp-data-engineering" # S3 버킷명 s3_key = schema + "-" + table # S3 키 (파일 경로)
# MySQL 데이터를 S3로 이동하는 작업 정의 mysql_to_s3_nps = SqlToS3Operator( task_id='mysql_to_s3_nps', # 작업 고유 식별자 query="SELECT * FROM prod.nps", # MySQL에서 데이터를 가져오는 쿼리 s3_bucket=s3_bucket, # 이동할 S3 버킷 s3_key=s3_key, # 이동할 S3 키 (파일 경로) sql_conn_id="mysql_conn_id", # MySQL 연결에 사용될 연결 ID aws_conn_id="aws_conn_id", # AWS 연결에 사용될 연결 ID verify=False, # SSL 인증서의 검증을 시행할지 여부 replace=True, # S3에 저장된 파일이 이미 존재할 경우 해당 파일을 교체(replace)할지 여부 pd_kwargs={"index": False, "header": False}, #DataFrame을 CSV로 변환할 때 인덱스 및 헤더의 포함 여부 dag=dag # DAG 연결 )
# S3에서 Redshift로 데이터를 이동하는 작업 정의 s3_to_redshift_nps = S3ToRedshiftOperator( task_id='s3_to_redshift_nps', # 작업 고유 식별자 s3_bucket=s3_bucket, # 읽어올 S3 버킷 s3_key=s3_key, # 읽어올 S3 키 (파일 경로) schema=schema, # Redshift 테이블의 스키마 table=table, # Redshift 테이블명 copy_options=['csv'], # COPY 명령에 사용되는 옵션
method='REPLACE', # 데이터 이동 방법 (REPLACE: 기존 데이터 대체) redshift_conn_id="redshift_dev_db", # Redshift 연결에 사용될 연결 ID aws_conn_id="aws_conn_id", # AWS 연결에 사용될 연결 ID dag=dag # DAG 연결 )
# 작업 간의 의존성 정의 mysql_to_s3_nps >> s3_to_redshift_nps
row_number로 구현하는경우 예) daily update 이고, A인 table을 mysql에서 읽어올때, row_number로 구현 1) redshift의 A테이블의 내용을 temp_A로 복사 2) A테이블의 레코드 중 modified의 날짜가 지난일(execution_date)에 해당하는 모든 레코드를 temp_A로 복사 select * from A where date(modified) = date(execution_date) 3) temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 desc 정렬해서, 일렬번호가 1인 것들만 다시 A로 복사
S3ToRedshiftOperator로 구현하는 경우 1) SELECT * FROM A WHERE DATE(modified) = DATE(execution_date) 2) method 파라미터로 “UPSERT”를 지정 3) upsert_keys 파라미터로 Primary key를 지정(앞서 nps 테이블이라면 id필드를 사용)
incremental update 코드
# 필요한 Airflow 모듈 및 클래스 가져오기 from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.models import Variable
# 필요한 Python 표준 라이브러리 가져오기 from datetime import datetime from datetime import timedelta
# DAG 정의 dag = DAG( dag_id='MySQL_to_Redshift_v2', # DAG의 고유 식별자 start_date=datetime(2023, 1, 1), # DAG 실행 시작일 schedule='0 9 * * *', # DAG의 스케줄 (UTC 기준, 매일 9시에 실행) max_active_runs=1, # 최대 동시 실행 횟수 catchup=False, # 과거 실행 여부 default_args={ 'retries': 1, # 작업 재시도 횟수 'retry_delay': timedelta(minutes=3), # 작업 재시도 간격 } )
# Redshift 테이블, S3 버킷, S3 키 관련 설정 schema = "본인id" # Redshift 스키마 table = "nps" # Redshift 테이블명 s3_bucket = "grepp-data-engineering" # S3 버킷명 s3_key = schema + "-" + table # S3 키 (파일 경로) # s3_key = schema + "/" + table # S3 키 (다른 형식의 예시)
# MySQL 쿼리 정의 sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
# MySQL 데이터를 S3로 이동하는 작업 정의 mysql_to_s3_nps = SqlToS3Operator( task_id='mysql_to_s3_nps', # 작업 고유 식별자 query=sql, # MySQL에서 데이터를 가져오는 쿼리 s3_bucket=s3_bucket, # 이동할 S3 버킷 s3_key=s3_key, # 이동할 S3 키 (파일 경로) sql_conn_id="mysql_conn_id", # MySQL 연결에 사용될 연결 ID aws_conn_id="aws_conn_id", # AWS 연결에 사용될 연결 ID verify=False, # SSL 검증 여부 replace=True, # S3 파일 교체 여부 pd_kwargs={"index": False, "header": False}, # pandas DataFrame 설정 dag=dag # DAG 연결 )
# S3에서 Redshift로 데이터를 이동하는 작업 정의 s3_to_redshift_nps = S3ToRedshiftOperator( task_id='s3_to_redshift_nps', # 작업 고유 식별자 s3_bucket=s3_bucket, # 읽어올 S3 버킷 s3_key=s3_key, # 읽어올 S3 키 (파일 경로) schema=schema, # Redshift 테이블의 스키마 table=table, # Redshift 테이블명 copy_options=['csv'], # COPY 명령에 사용되는 옵션 redshift_conn_id="redshift_dev_db", # Redshift 연결에 사용될 연결 ID aws_conn_id="aws_conn_id", # AWS 연결에 사용될 연결 ID
method="UPSERT", # 데이터 이동 방법 (UPSERT: 삽입 또는 갱신) upsert_keys=["id"], # UPSERT 작업에 사용될 키 dag=dag # DAG 연결 )
# 작업 간의 의존성 정의 mysql_to_s3_nps >> s3_to_redshift_nps