하둡 2.0(yarn 1.0) : 분산 컴퓨팅 시스템 

  • 분산 컴퓨팅 시스템은 여러 대의 컴퓨터를 활용하여 데이터 처리 또는 계산 작업을 수행하는 시스템을 의미
  • 하둡 2.0은 분산 컴퓨팅을 지원하는 프레임워크 중 하나로, 그 중요한 기능 중 하나가 YARN(Yet Another Resource Negotiator)이라 불리는 리소스 매니저

하둡 2.0의 구성요소

  1. 리소스 매니저 (Resource Manager): 하둡 2.0에서 중요한 역할을 하는 부분 중 하나입니다. 리소스 매니저는 전체 클러스터에서 사용 가능한 자원을 효율적으로 관리하고, 각각의 작업이 필요로 하는 자원을 할당합니다. 이는 Job Scheduler와 Application Manager로 구성되어 있습니다.

    1) Job Scheduler: 작업을 관리하고 스케줄링하는 역할을 합니다. 여러 작업 중에서 어떤 작업을 언제 실행할지를 결정하며, 리소스 매니저에게 자원을 요청하여 작업을 수행합니다.
    2) Application Manager: 어플리케이션의 생명 주기를 관리합니다. 어플리케이션 실행, 종료, 실패 등의 이벤트를 처리하고 어플리케이션 상태를 유지합니다.

  2. 노드 매니저 (Node Manager): 각 컴퓨터 노드에서 실행되며, 노드의 자원 사용 및 상태를 리소스 매니저에게 보고합니다. 리소스 매니저가 할당한 자원을 기반으로 작업을 수행하는 역할을 합니다.

  3. 컨테이너: 리소스를 격리된 환경에서 실행하기 위한 메커니즘입니다. 앱 마스터와 태스크를 실행하는 데 사용됩니다.
    1) 앱 마스터 (Application Master): 각 어플리케이션마다 할당된 리소스를 관리하는 역할을 합니다. 어플리케이션의 실행 계획을 작성하고, 노드 매니저에게 자원을 할당받아 태스크를 실행합니다.
    2) 태스크: 실제로 수행되어야 하는 작업 단위입니다. 어플리케이션의 로직을 실행하는 부분이며, 컨테이너 내에서 실행됩니다.


YARN의 동작

  1.  **어플리케이션 및 환경 정보 전달:**
       - 사용자가 실행하려는 코드와 관련된 환경 정보를 Resource Manager(RM)에게 전달합니다.
       - RM은 이 정보를 기반으로 실행에 필요한 자원을 할당하게 됩니다.

  2. **파일 복사:**
       - 실행에 필요한 파일들은 HDFS에 해당하는 어플리케이션 ID의 폴더에 미리 복사됩니다.
       - 이는 분산 파일 시스템인 HDFS에 저장된 데이터를 활용하여 데이터의 안정성과 고가용성을 보장합니다.

  3.  **컨테이너 할당:**
       - RM은 NM(Node Manager)으로부터 컨테이너를 받아옵니다.
       - Application Master(AM)은 각각의 어플리케이션에 대해 할당된 프로그램 마스터에 해당합니다.

  4. **AM이 리소스 요구:**
       - AM은 실행에 필요한 리소스를 RM에게 요구합니다.
       - RM은 data locality를 고려하여 리소스(컨테이너)를 할당합니다.
  5. **리소스 할당 및 파일 복사:**
       - 할당된 리소스는 실행에 필요한 파일들과 함께 해당 노드의 HDFS로 복사됩니다.
       - 이때 data locality를 고려하여 데이터를 가지고 있는 노드에 할당되어 I/O 성능을 최적화합니다.
  6.  **컨테이너 내에서 코드 실행:**
       - AM은 할당받은 리소스(컨테이너)를 NM을 통해 실행합니다.
       - 컨테이너 내에서 코드가 실행되며, 복사된 파일들이 활용됩니다.
  7. **태스크 상태 보고:**
       - 각 태스크는 주기적으로 AM에게 상태를 보고합니다(heartbeat).
       - 만약 태스크가 실패하거나 보고가 오랜 시간 없으면, 해당 태스크를 다른 컨테이너로 재실행할 수 있습니다.

하둡 3.0의 특징

  1.  **YARN 2.0의 도입:**
       - 하둡 3.0에서는 YARN (Yet Another Resource Negotiator) 버전이 업그레이드되었습니다. YARN은 분산 컴퓨팅을 위한 자원 관리 시스템으로, 하둡 클러스터에서 다양한 작업들을 효율적으로 실행할 수 있게 합니다.
       - YARN 2.0에서는 프로그램들을 논리적인 그룹으로 나누어서 자원 관리가 가능한데, 이를 '플로우(Flow)'라고 부릅니다. 이로써 데이터 수집과 데이터 서빙 프로세스 등을 분리하여 관리할 수 있게 되었습니다.

  2. **타임라인 서버와 HBase 사용:**
       - YARN 2.1부터는 타임라인 서버에서 HBase를 기본 스토리지로 사용합니다. 타임라인 서버는 YARN 애플리케이션의 실행 이력 정보를 수집하고 검색할 수 있는 기능을 제공합니다.
       - 이를 통해 클러스터에서 실행되는 애플리케이션들에 대한 이력과 메트릭 데이터를 효과적으로 저장하고 조회할 수 있습니다.

  3. **다양한 파일 시스템 지원:**
       - 하둡 3.0에서는 Hadoop Distributed File System (HDFS)의 내임노드가 다수의 스탠바이 내임노드를 지원합니다. 이것은 내임노드의 고가용성을 향상시키고, 클러스터의 안정성을 높입니다.
       - 또한, HDFS뿐만 아니라 다양한 파일 시스템을 지원합니다. S3, Azure Storage 외에도 Azure Data Lake Storage 등을 통합적으로 지원하여 유연성을 제공합니다.

'하둡,spark' 카테고리의 다른 글

하둡 설치 +맵리듀스(wordcount) 실습  (0) 2024.01.18
MapReduce(맵리듀스) 프로그래밍  (0) 2024.01.17
하둡 기초  (0) 2024.01.15
빅데이터 기초  (0) 2024.01.15
하둡의 등장과, spark로의 흐름  (0) 2024.01.15

하둡 등장배경

  1. The Google File System 논문 에서 분산파일 시스템에 관한 내용
  2. MapReduce: Simplified Data Processing on Large Cluster 에서 분산컴퓨팅 시스템 제시
  3. Nutch라는 오픈소스 검색엔진의 하부 프로젝트로 구체적인 시작
  4. 하둡의 등장:  2006년에 아파치 톱레벨 별개 프로젝트로 떨어져나옴

하둡이란?

1. 하둡은 커뮤디티 하드웨어를 활용한 대규모 데이터를 분산 저장하고 처리하는 오픈 소스 소프트웨어 플랫폼.

2. 다수의 노드로 구성된 클러스터 시스템이 마치 하나의 거대한 컴퓨터처럼 동작
3. Hadoop Distributed File System (HDFS)를 통해 데이터를 분산 저장하며, MapReduce를 사용하여 분산 컴퓨팅을 수행.
4. 비용 효율적인 하드웨어를 활용하여 안정적으로 데이터를 저장하고, 병렬 처리를 통해 대용량 데이터셋을 효과적처리.
5. 다양한 산업 분야에서 대용량 데이터 처리 및 분석에 활용

 

하둡의 발전

  • 하둡 1.0은 HDFS위에 MapReduce라는 분산컴퓨팅 시스템이 도는 구조
  • MapReduce 위에서 pig, hive, presto 등 다양한 컴퓨팅 언어들이 만들어짐

  • 하둡 2.0 :  하둡은 YARN이란 이름의 분산처리 시스템위에서 동작하는 애플리케이션이 됨
  • Spark은 YARN위에서 애플리케이션 레이어로 실행

 

HDFS-분산파일 시스템

  • 데이터를 블록단위로 나눠 저장(블록 크기는 128MB)
  • 이 블록들은 Fault tolerance를 보장하기 위해 각각 3군데에 중복으로 저장됨
    + Fault tolerance : 시스템이 하나 이상의 컴포넌트나 서비스의 장애 또는 실패에 대해 견딜 수 있는 능력
  • 하둡 2.0에서는 내임노드의 이중화가 지원되며, Active와 Standby 두 노드가 존재
    + Active에 문제가 생기면 Standby가 내임노트 역할을 함
  • 두 노드 사이에는 공유된 edit log가 존재하며, 이를 통해 지속적인 데이터 일관성을 유지
  • 또한, Secondary 내임노드도 여전히 존재하여 데이터의 신뢰성과 안정성을 제공

 

MapReduce : 분산 컴퓨팅 시스템

  • 하둡 1.0 버전에서는 하나의 잡 트래커(Job Tracker)와 다수의 태스크 트래커(Task Tracker)로 구성되어 있다.
  • 잡 트래커는 작업을 분할하고 다수의 태스크 트래커에게 할당. 각 태스크 트래커에서는 병렬로 작업을 처리.
  • 이 시점의 하둡은 MapReduce만을 지원하며, 제너럴한(일반적인) 시스템이 아니었습니다.
  • 다시 말해, 하둡 1.0은 주로 대용량 데이터 처리를 위한 특화된 환경으로 사용되었으며, 일반적인 용도의 다양한 작업을 수행하는 데에는 적합하지 않았습니다.

'하둡,spark' 카테고리의 다른 글

하둡 설치 +맵리듀스(wordcount) 실습  (0) 2024.01.18
MapReduce(맵리듀스) 프로그래밍  (0) 2024.01.17
YARN  (0) 2024.01.16
빅데이터 기초  (0) 2024.01.15
하둡의 등장과, spark로의 흐름  (0) 2024.01.15

빅데이터

  • 정의1 : 서버 한대로 처리할 수 없는 규모의 데이터
  • 분산환경에서는 scale-up(사양증가)를 하다가  scale-out을 하는게 일반적.
    + 분산환경 : 여러 컴퓨터 또는 서버가 네트워크를 통해 연결되어 작업을 협력적으로 수행하는 환경

 

  • 정의2 : 기존의 소프트웨어(오라클,mysql)로는 처리할 수 없는 규모의 데이터
  • 오라클,mysql,pandas로 처리할 데이터가 너무크면  spark를 사용
  • 정의3 : 4V (Volume, Velocity, Variety, Varecity)
    Volume: 데이터의 크기가 대용량?
    Velocity: 데이터의 처리 속도가 중요?
    Variety: 구조화/비구조화 데이터 둘다?
    Veracity: 데이터의 품질이 좋은지?

빅데이터 예

  • 빅데이터(디바이스데이터) 예 : 모바일디바이스, 스마트tv, 각종센서 데이터, 네트워킹디바이스
  • 빅데이터(웹) 예 : 웹페이지(위키 및 기타사 이트), 사용자행동정보(검색어, 클릭정보)

빅데이터처리 특징

  • 큰 데이터를 손실없이 보관할 방법 및 스토리지가 필요
    방법1. 비구조화 데이터를 보관 : 이런 비구조화 데이터의 등장으로 SQL 만으로 불가능해진다.
    방법2. 큰 데이터 저장이 가능한 분산 파일 시스템이 필요
  • 처리 시간이 오래걸림
    방법1. 병렬 처리가 가능한 분산 컴퓨팅 시스템이 필요.
    방법2. 비구조화 데이터를 처리할수 있는 방법을 활용

 

대용량 분산 시스템이란?

  • 분산 환경 : 1대 혹은 그 이상의 서버로 구성됨
    +분산 파일 시스템과 분산 컴퓨팅 시스템을 포함하는 개념
  • Fault Tolerance : 소수의 서버가 고장나도 동작해야함
  • scale out 가능성 : 확장이 용이해야한다.

 

결론 : 하둡이 필요

'하둡,spark' 카테고리의 다른 글

하둡 설치 +맵리듀스(wordcount) 실습  (0) 2024.01.18
MapReduce(맵리듀스) 프로그래밍  (0) 2024.01.17
YARN  (0) 2024.01.16
하둡 기초  (0) 2024.01.15
하둡의 등장과, spark로의 흐름  (0) 2024.01.15

하둡과 spark로 흐름 요약

  1. 대용량 데이터 처리를 위한 오픈소스 기술인 하둡이 등장하면서 데이터 처리 방식이 혁신되었습니다.
  2. 하둡은 분산 파일 시스템과 분산 컴퓨팅 시스템으로 구성되어 있으며, 핵심 구성 요소로는 HDFS와 맵리듀스/YARN이 있습니다.
  3.  맵리듀스 프로그래밍은 제약이 많아 데이터 처리가 어려웠고, 이로 인해 SQL이 다시 주목받게 되었습니다.
    +맵리듀스의 제약 : 병렬 처리를 위해 데이터를 키-값 쌍으로 변환해야 한다. 이로 인해 복잡한 작업을 수행하기가 어려워지고, 비즈니스 로직을 표현하는 데에도 한계가 있다. 또한 맵리듀스는 반복적인 계산을 표현하기에는 적합하지 않아, 반복이 많은 작업에서 효율성이 떨어진다.

  4.  SQL은 데이터 처리를 보다 직관적으로 수행할 수 있도록 도와주는 장점을 가지고 있습니다.
  5.  최근에는 Spark가 대세로 떠오르고 있는데, 이는 대용량 데이터의 분산 컴퓨팅을 위한 기술로 평가받고 있습니다.
  6.  Spark는 Pandas와 Scikit Learn을 강화시킨 것으로 볼 수 있어, 강력한 성능을 자랑합니다.
  7. 뿐만 아니라, SQL 쿼리를 지원하며 스트림 데이터 처리와 그래프 처리도 가능합니다.
  8. Spark은 데이터 처리에 있어 다양한 작업에 효과적으로 대응할 수 있는 다재다능한 기술로 인정받고 있습니다.
  9. 이는 기존의 맵리듀스 프로그래밍의 한계를 극복하고 보다 효율적인 데이터 처리를 가능케 합니다.
  10. 따라서 Spark는 현대의 데이터 처리 환경에서 중요한 위치를 차지하고 있습니다.

'하둡,spark' 카테고리의 다른 글

하둡 설치 +맵리듀스(wordcount) 실습  (0) 2024.01.18
MapReduce(맵리듀스) 프로그래밍  (0) 2024.01.17
YARN  (0) 2024.01.16
하둡 기초  (0) 2024.01.15
빅데이터 기초  (0) 2024.01.15

[core]
# Airflow 파이프라인이 존재하는 폴더입니다. 대부분 코드 저장소의 하위 폴더일 것입니다.
# 이 경로는 절대 경로여야 합니다.
dags_folder = /opt/airflow/dags

# 호스트 이름을 해결하는 데 사용되는 호출 가능한 경로를 제공합니다.
# 형식은 "package.function"입니다.
# 예: 기본값 "airflow.utils.net.getfqdn"은 patched 된
# 버전의 socket.getfqdn()의 결과를 나타냅니다 - https://github.com/python/cpython/issues/49254.
#
# 함수에서 인수가 필요하지 않아야 합니다.
# 호스트 이름으로 IP 주소를 사용하려면 값으로 ``airflow.utils.net.get_host_ip_address``를 사용합니다.
hostname_callable = airflow.utils.net.getfqdn

# 날짜 및 시간이 Naive 한 경우의 기본 타임존
# utc (기본값), system 또는 IANA 타임존 문자열(예: Europe/Amsterdam)이 될 수 있습니다.
default_timezone = utc

# Airflow가 사용할 실행자 클래스를 정의합니다. 선택할 수 있는 옵션으로
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` 또는 사용자 정의 실행자를 사용하는 경우
# 클래스의 전체 import 경로가 있습니다.
executor = SequentialExecutor

# 이것은 Airflow에서 스케줄러 당 동시에 실행되는 최대 작업 인스턴스 수를 정의합니다.
# 일반적으로 이 값은 귀하의 클러스터의 스케줄러 수와 곱한 값이
# 메타데이터 데이터베이스에서 실행 중인 상태의 최대 작업 인스턴스 수입니다.
parallelism = 32

# 각 DAG에서 동시에 실행되는 최대 작업 인스턴스 수입니다. DAG의 모든 DAG 실행에 대한
# 실행 중인 작업 수를 계산하려면 DAG 수준에서 ``max_active_tasks``로 설정합니다.
#
# 이것이 유용한 예시 시나리오는 초기 시작 날짜가 있는 새로운 DAG가 클러스터에서
# 모든 실행기 슬롯을 도둑 맞지 않도록 하는 경우입니다.
max_active_tasks_per_dag = 16

# DAG 생성시 기본적으로 일시 중지됩니까
dags_are_paused_at_creation = True

# DAG 당 최대 활성 DAG 실행 수입니다. 스케줄러는 제한에 도달하면 더 이상 DAG 실행을
# 생성하지 않습니다. DAG 수준에서 ``max_active_runs``로 설정할 수 있으며
# 기본값은 ``max_active_runs_per_dag``로 설정됩니다.
max_active_runs_per_dag = 16

# Python 프로세스를 시작하기 위해 multiprocessing 모듈을 통해 사용할 메서드의 이름입니다.
# 이는 직접 Python 문서에서 사용 가능한 옵션과 대응합니다:
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.set_start_method.
# ``fork``를 mp_start_method로 설정하는 예: mp_start_method = fork
# mp_start_method =

# Airflow와 함께 제공되는 DAG 예제를로드해야 하는지 여부를 정의합니다.
# 시작하는 데 좋지만 프로덕션 환경에서는이를 ``False``로 설정하는 것이 좋습니다.
load_examples = True

# Airflow 플러그인이 있는 폴더의 경로
plugins_folder = /opt/airflow/plugins

# 작업을 부모 프로세스의 포크 ("False", 더 빠른 옵션) 또는 새로운 Python 프로세스로 실행해야 합니까 ("True" 느립니다,
# 그러나 플러그인 변경 사항이 즉시 작업에 반영됨)
execute_tasks_new_python_interpreter = False

# 연결 비밀번호를 데이터베이스에 저장하는 데 사용되는 비밀 키
fernet_key = 

# DAG 파일 가져 오기 시간 제한
dagbag_import_timeout = 30.0

# DAG백에서 DAG 파일 가져오기 오류의 UI에 traceback를 표시해야 하는지 여부
dagbag_import_error_tracebacks = True

# Traceback가 표시되는 경우 traceback에서 표시할 항목 수
dagbag_import_error_traceback_depth = 2

# DagFileProcessor를 타임 아웃하기 전에 DAG 파일을 처리하는 데 걸리는 시간
dag_file_processor_timeout = 50

# 서브 프로세스에서 작업 인스턴스를 실행하는 데 사용할 클래스입니다.
# StandardTaskRunner, CgroupTaskRunner 또는 사용자 정의 작업 러너를 사용하는 경우
# 클래스의 전체 import 경로가 있습니다.
task_runner = StandardTaskRunner

# 설정된 경우 ``run_as_user`` 인수가없는 작업은이 사용자로 실행됩니다.
# 작업을 실행할 때 Airflow를 실행하는 sudo 사용자를 비슷하게 사용하여 작업을 실행 중인 경우
# 사용하여 사용자 실행 중에 권한을 해제 할 수 있습니다.
default_impersonation =

# 사용할 보안 모듈 (예 : kerberos)
security =

# 단위 테스트 모드를 켤 것인가 (런타임 시 여러 구성 옵션을 테스트 값으로 덮어 씁니다)
unit_test_mode = False

# XCom에 대한 피클링을 활성화할 것인가 (이것은 불안전하며 RCE exploits을 허용합니다).
enable_xcom_pickling = False

# 역직렬화 중에 가져올 수있는 클래스입니다. 이는 다중 행 값으로 구문 분석됩니다.
# 각 항목은 정규 표현식으로 구문 분석됩니다. Python 내장 클래스 (예 : dict)은 항상 허용됩니다.
allowed_deserialization_classes = airflow\..*

# 작업이 강제로 종료될 때, SIGTERM을 수신 한 후 정리 할 수 있는 초 단위 시간
killed_task_cleanup_time = 60

# params를 dag_run.conf로 override할 것인가. ``airflow dags backfill -c`` 또는
# ``airflow dags trigger -c``를 통해 몇 가지 키-값 쌍을 전달하면 params의 기존 값이 override됩니다.
dag_run_conf_overrides_params = True

# DAG를 검색할 때 ``DAG`` 및 ``airflow``라는 문자열이 포함되지 않은 파일을 무시합니다.
dag_discovery_safe_mode = True

# DAG 디렉터리의 ".airflowignore" 파일에서 사용되는 패턴 구문. 유효한 값은
# ``regexp`` 또는 ``glob``입니다.
dag_ignore_file_syntax = regexp

# 각 작업이 기본적으로 갖게 될 다시 시도 횟수입니다. DAG 또는 작업 수준에서 재정의 될 수 있습니다.
default_task_retries = 0

# 각 작업이 기본적으로 다시 시도 사이에 대기 할 시간 (초)입니다. DAG 또는 작업 수준에서 재정의 될 수 있습니다.
default_task_retry_delay = 300

# 작업의 유효한 총 우선 순위 가중치를위한 가중치 메서드
default_task_weight_rule = downstream

# 연산자의 기본 작업 실행 제한 시간 값입니다. 초로 timedelta에 전달 할 예정인 정수 값을
# 지정하십시오. 지정되지 않으면 값은 기본적으로 None으로 간주되어 연산자는 기본적으로 타임 아웃되지 않습니다.
default_task_execution_timeout =

# 직렬화 된 DAG를 업데이트하는 것이 최소 간격보다 빨라서는 안됩니다. 데이터베이스 쓰기 속도를 줄이기 위해
min_serialized_dag_update_interval = 30

# True 인 경우 직렬화 된 DAG는 DB에 쓰기 전에 압축됩니다.
# 참고 : 이렇게하면 DAG 종속성보기가 비활성화됩니다.
compress_serialized_dags = False

# 직렬화 된 DAG를 검색하는 것이 최소 간격보다 빨라서는 안됩니다.
# 이 구성은 Webserver에서 DAG가 업데이트되는 시간을 제어합니다
min_serialized_dag_fetch_interval = 10

# 각 작업 인스턴스 필드 (템플릿 필드)당 저장할 수있는 최대 수
# 데이터베이스에 저장된 각 작업 인스턴스에 대한 모든 템플릿 필드입니다.
# 이 숫자를 작게 유지하면 오래된 작업에 대한 ``Rendered`` 탭을 볼 때 오류가 발생할 수 있습니다.
max_num_rendered_ti_fields_per_task = 30

# 각 dagrun에서 정의 된 SLA에 대한 체크 여부
check_slas = True

# 연산자 결과를 저장하고 해결하는 데 사용될 사용자 정의 XCom 클래스의 경로
# 예 : xcom_backend = path.to.CustomXCom
xcom_backend = airflow.models.xcom.BaseXCom

# 기본적으로 Airflow 플러그인은 게으르게로드됩니다 (필요할 때만로드됨). 그것을 ``False``로 설정하십시오,
# 플러그인이 'airflow'가 cli를 통해 호출되거나 모듈에서로드 될 때마다 플러그인을로드하려면.
lazy_load_plugins = True

# 기본적으로 Airflow 공급 업체는 게으르게 발견됩니다 (필요할 때만 발견 및 가져오기).
# False로 설정하면 CLI 또는 모듈에서 'airflow'가 호출 될 때마다 공급 업체를 발견하려면 False로 설정하십시오.
lazy_discover_providers = True

# 민감한 변수 또는 연결 추가 json 키를 UI 및 작업 로그에서 숨길지 여부
#
# (연결 비밀번호는 항상 로그에서 숨겨집니다)
hide_sensitive_var_conn_fields = True

# 변수 이름 또는 연결의 extra JSON에서 숨길 추가 민감한 키의 쉼표로 구분 된 목록
sensitive_var_conn_names =

# ``default_pool`` Task Slot 카운트. 이 설정은 기존 배포에서 ``default_pool``이 이미 생성된 경우에만 영향을 미칩니다.
# 기존 배포에서는 사용자가 웹서버, API 또는 CLI를 사용하여 슬롯 수를 변경할 수 있습니다.
default_pool_task_slot_count = 128

# XCom이 작업 매핑을 트리거하기 위해 푸시 할 수있는 최대 목록/사전 길이입니다.
# 푸시 된 목록/사전의 길이가이 값보다 크면 XCom을 푸시하는 작업이 자동으로 실패하여 스케줄러가 지연되지 않도록합니다.
max_map_length = 1024

# 데몬 모드에서 실행될 때 프로세스에 사용할 기본 umask입니다 (스케줄러, 워커 등).
#
# 이것은 새로 생성 된 파일의 파일 권한 비트의 초기 값을 결정하는 파일 생성 모드 마스크를 제어합니다.
#
# 이 값은 8 진수 정수로 취급됩니다.
daemon_umask = 0o077

# 데이터 세트 관리자로 사용할 클래스입니다.
# 예: dataset_manager_class = airflow.datasets.manager.DatasetManager
# dataset_manager_class =

# 데이터 세트 관리자에 제공할 kwargs입니다.
# 예: dataset_manager_kwargs = {"some_param": "some_value"}
# dataset_manager_kwargs =

[database]
# SqlAlchemy 메타데이터 데이터베이스에 대한 연결 문자열입니다.
# SqlAlchemy는 여러 다른 데이터베이스 엔진을 지원합니다.
# 자세한 정보는 여기에서 확인하십시오:
http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
sql_alchemy_conn = sqlite:////opt/airflow/airflow.db

# SQLAlchemy의 create_engine에 전달되는 추가 엔진별 키워드 인수입니다.
# 예: sql_alchemy_engine_args = {"arg1": True}
# sql_alchemy_engine_args =

# 데이터베이스의 인코딩입니다.
sql_engine_encoding = utf-8

# ``dag_id``, ``task_id``, ``key``, ``external_executor_id`` 열의 인코딩이 다른 경우
# 이 collation이 사용됩니다.
# 기본적으로이 collation은 데이터베이스 collation과 동일하지만 ``mysql`` 및 ``mariadb``의 경우
# 기본값은 인덱스 키의 최대 크기가 허용 된 인덱스 크기를 초과하지 않도록하기위한 ``utf8mb3_bin``입니다.
# (https://github.com/apache/airflow/pull/17603#issuecomment-901121618 참조)
# sql_engine_collation_for_ids =

# SqlAlchemy가 데이터베이스 연결을 풀어야 하는지 여부입니다.
sql_alchemy_pool_enabled = True

# SqlAlchemy 풀 크기는 풀의 최대 데이터베이스 연결 수입니다.
# 0은 제한이 없음을 나타냅니다.
sql_alchemy_pool_size = 5

# 풀 크기로 체크 아웃 된 연결 수가 pool_size로 설정된 크기에 도달하면
# 추가 연결이이 제한까지 반환됩니다.
# 이러한 추가 연결이 풀로 반환되면 연결이 끊기고 폐기됩니다.
# 따라서 풀이 허용하는 동시 연결 수의 총계는
# pool_size + max_overflow이며
# 풀이 허용하는 "sleeping" 연결 수의 총계는 pool_size입니다.
# max_overflow는 ``-1``로 설정할 수 있습니다.
# overflow 제한이없음을 나타내려면;
# 동시 연결의 총 수에 대한 제한이 없게됩니다. 기본값은 ``10``입니다.
sql_alchemy_max_overflow = 10

# SqlAlchemy 풀 재활용은 연결이
# 풀에서 무효화되기 전에 풀에서 놀 수있는 초수입니다.
# 이 구성은 sqlite에는 적용되지 않습니다. DB 연결 수가 초과 된 경우
# 낮은 구성 값은 시스템이 더 빨리 복구 할 수 있게합니다.
sql_alchemy_pool_recycle = 1800

# 연결 풀 체크 아웃 시작시 연결마다 연결을 확인할지 여부입니다.
# 일반적으로 이것은 "SELECT 1"과 같은 간단한 문입니다.
# 자세한 내용은 여기에서 확인하십시오:
https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
sql_alchemy_pool_pre_ping = True

# 메타데이터 데이터베이스에 사용할 스키마입니다.
# SqlAlchemy는 여러 개의 스키마 개념을 가진 데이터베이스를 지원합니다.
sql_alchemy_schema =

# Airflow가 함께 제공하는 기본 연결을로드할지 여부입니다. 시작하는 데는 좋지만
# 생산 환경에서는 이것을 ``False``로 설정하는 것이 좋습니다.
load_default_connections = True

# DB 운영 오류의 경우 코드를 재시도해야하는 횟수입니다.
# 원하지 않는 상태를 초래할 수 있으므로 모든 트랜잭션이 다시 시도되지는 않습니다.
# 현재 ``DagFileProcessor.process_file``에서만 사용되어 ``dagbag.sync_to_db``를 다시 시도합니다.
max_db_retries = 3

[logging]
# Airflow가 로그 파일을 저장해야하는 폴더입니다.
# 이 경로는 절대 경로 여야합니다.
# 기본값으로 설정되어 있다고 가정하는 몇 가지 구성이 있습니다.
# 이를 재정의하려면 dag_processor_manager_log_location 및
# dag_processor_manager_log_location 설정도 업데이트해야 할 수 있습니다.
base_log_folder = /opt/airflow/logs

# Airflow는 AWS S3, Google Cloud Storage 또는 Elastic Search에 원격으로 로그를 저장할 수 있습니다.
# 원격 로깅을 활성화하려면이 값을 True로 설정하십시오.
remote_logging = False

# 사용자는 저장소에 액세스 권한을 제공하는 Airflow 연결 ID를 제공해야합니다.
# 원격 로깅 서비스에 따라이는 로그 읽기 전용으로 사용될 수 있습니다.
remote_log_conn_id =

# Google Credential JSON 파일의 경로입니다. 생략하면
# 'Application Default Credentials`에 따라 권한 부여됩니다.
google_key_path =

# 원격 로깅의 로그를 저장하는 스토리지 버킷 URL입니다.
# S3 버킷은 "s3 ://"로 시작해야합니다.
# Cloudwatch 로그 그룹은 "cloudwatch ://"로 시작해야합니다.
# GCS 버킷은 "gs ://"로 시작해야합니다.
# WASB 버킷은 올바른 핸들러를 Airflow가 올바르게 선택하는 데 도움이 되기 위해 "wasb"로 시작해야합니다.
# Stackdriver 로그는 "stackdriver ://"로 시작해야합니다.
remote_base_log_folder =

# S3에 저장된 로그에 대해 서버 측 암호화 사용
encrypt_s3_logs = False

# 로깅 레벨.
#
# 지원되는 값 : ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
logging_level = INFO

# Celery의 로깅 레벨입니다. 설정되지 않은 경우 logging_level의 값을 사용합니다.
#
# 지원되는 값 : ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
celery_logging_level =

# Flask-appbuilder UI의 로깅 레벨입니다.
#
# 지원되는 값 : ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``, ``DEBUG``.
fab_logging_level = WARNING

# 로깅 클래스
# 로깅 구성을 지정할 클래스입니다.
# 이 클래스는 파이썬 클래스 경로에 있어야합니다.
# 예 : logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =

# 콘솔에서 Colored 로그를 사용/사용하지 않을지 여부 플래그
# 컨트롤하는 터미널이 TTY 인 경우 로그를 색상으로 지정합니다.
colored_console_log = True

# Colored 로그가 활성화된 경우 로그의 형식
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter

# 로그 라인의 형식
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Airflow 워커를 시작하면 Airflow는 로컬 로그 파일을 Airflow 메인 웹 서버에 제공하기 위해 작은 웹 서버
# 서브 프로세스를 시작합니다. 그런 다음 메인 웹 서버가이 로그를 연결하고 페이지를 작성하여 사용자에게 보냅니다.
# 이는 로그를 제공하는 데 사용됩니다. 사용되지 않은 상태 여야하며 메인 웹 서버에서 볼 수 있어야합니다.
worker_log_server_port = 8793


[metrics]

# StatsD (https://github.com/etsy/statsd) 통합 설정입니다.
# StatsD로 메트릭을 전송하도록 설정합니다.
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

# StatsD로 모든 사용 가능한 메트릭을 보내지 않으려면,
# 메트릭 이름의 앞부분이 목록의 요소로 시작되는 메트릭 만 보내도록 허용 목록을 구성할 수 있습니다.
# (예 : "scheduler,executor,dagrun")
statsd_allow_list =

# StatsD stat 이름을 유효성 검사하고 필요한 경우 stat 이름을 변경한 다음
# 변환 된 stat 이름을 반환하는 함수입니다.
#
# 함수는 다음 서명을 가져야합니다.
# def func_name(stat_name: str) -> str:
stat_name_handler =

# Airflow 메트릭을 보내기 위해 datadog 통합을 활성화하려면 True로 설정합니다.
statsd_datadog_enabled = False

# 모든 메트릭에 추가되는 datadog 태그 목록입니다 (예 : key1:value1, key2:value2)
statsd_datadog_tags =

# 사용자 정의 StatsD 클라이언트를 활용하려면 해당 모듈 경로를 설정하십시오.
# 참고 : Airflow가이를 가져와야하므로 모듈 경로는 PYTHONPATH에 존재해야합니다.
# statsd_custom_client_path =

[secrets]
# 사용할 비밀번호 백엔드의 전체 클래스 이름 (환경 변수 및 메타 스토어 전에 나옴)
# 예 : backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend =

# backend_kwargs 매개 변수는 사전으로로드되어 secrets backend 클래스의 __init__에 전달됩니다.
# 사용중인 비밀번호 백엔드의 설명서를 참조하십시오. JSON을 예상합니다.
# AWS Systems Manager ParameterStore의 경우 다음과 같습니다.
# ``{"connections_prefix": "/airflow/connections", "profile_name": "default"}``
backend_kwargs =

[cli]
# cli가 API에 어떻게 액세스해야합니까. LocalClient는 데이터베이스를 직접 사용하며
# json_client는 웹 서버에서 실행되는 api를 사용합니다.
api_client = airflow.api.client.local_client

# web_server_url_prefix를 설정하면 여기에 추가하지 않으면 안됩니다. 예:
# ``endpoint_url = http://localhost:8080/myroot``
# 그러면 api가 다음과 같아집니다. ``http://localhost:8080/myroot/api/experimental/...``
endpoint_url = http://localhost:8080

[debug]
# ``DebugExecutor``만 사용됩니다. ``True``로 설정하면 DAG가 처음에 실패합니다.
# 실패한 작업이 있습니다. 디버깅 목적으로 유용합니다.
fail_fast = False

[api]
# 사용이 중단 된 실험적 API를 활성화합니다.이 API에는 액세스 제어가 없습니다.
# 인증 된 사용자에게 완전한 액세스 권한이 있습니다.
#
# .. 경고 ::
#
#   이 `Experimental REST API <https://airflow.readthedocs.io/en/latest/rest-api-ref.html>`__는
#   2.0 버전부터 사용 중단되었습니다. 대신 사용을 고려하십시오
#   `안정된 REST API <https://airflow.readthedocs.io/en/latest/stable-rest-api-ref.html>`__.
#   이주에 대한 자세한 내용은
#   `RELEASE_NOTES.rst <https://github.com/apache/airflow/blob/main/RELEASE_NOTES.rst>`_
enable_experimental_api = False

# API 사용자의 인증을 위한 auth 백엔드 사용자 목록입니다. 참조
https://airflow.apache.org/docs/apache-airflow/stable/security/api.html에 가능한 값.
# ("airflow.api.auth.backend.default"는 역사적인 이유로 모든 요청을 허용합니다)
auth_backends = airflow.api.auth.backend.session

# API 요청의 최대 페이지 제한을 설정하는 데 사용됩니다
maximum_page_limit = 100

# limit이 0 인 경우 제로에서 기본 페이지 제한을 설정하는 데 사용됩니다. 기본 제한
# 100이 OpenApi 명세에 설정되어 있습니다. 그러나이 특정 기본 제한
# 제한이 API 요청에서 제로 (0)으로 설정된 경우에만 작동합니다.
# 제한이 제공되지 않으면 OpenApi 명세 기본값을 사용합니다.
fallback_page_limit = 100

# JWT 토큰 자격 증명을 위해 사용되는 대상입니다. 클라이언트 및 서버 측에서이 값을 일치시켜야합니다. 비어 있으면 청중이 테스트되지 않습니다.
# 예 : google_oauth2_audience = project-id-random-value.apps.googleusercontent.com
google_oauth2_audience =

# Google Cloud Service Account 키 파일 (JSON)의 경로입니다. 생략하면 권한이
# `the Application Default Credentials
# <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__을
# 기반으로합니다.
# 예 : google_key_path = /files/service-account-json
google_key_path =

# 이는 프리플라이트 요청에 대한 응답으로 사용되는 헤더입니다.이 헤더는
# Access-Control-Request-Headers 헤더입니다.
access_control_allow_headers =

# 리소스에 액세스 할 때 허용되는 방법을 지정합니다.
access_control_allow_methods =

# 응답이 주어진 원본의 요청 코드와 공유 될 수 있는지 나타냅니다.
# 여러 URL을 공백으로 구분합니다.
access_control_allow_origins =

[lineage]
# 사용할 lineage 백엔드입니다.
backend =

[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =

[operators]
# 각 새로운 operator에 할당되는 기본 소유자입니다.
# 명시적으로 제공되지 않았거나 default_args를 통해 전달되지 않은 경우에만 해당됩니다.
default_owner = airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0

# 작업에 할당되고 worker가 청취하는 기본 대기열입니다.
default_queue = default

# 추가/사용되지 않은 인수(args, kwargs)를 BaseOperator에 전달할 수 있는지 여부입니다.
# False로 설정하면 예외가 throw되며 그렇지 않으면 콘솔 메시지 만 표시됩니다.
allow_illegal_arguments = False

[hive]
# HiveOperator 작업에 대한 기본 mapreduce 대기열
default_hive_mapred_queue =

# HiveOperator에서 mapred_job_name에 대한 템플릿, 다음 명명 된 매개 변수를 지원합니다.
# hostname, dag_id, task_id, execution_date
# mapred_job_name_template =

[webserver]
# 웹 사이트의 기본 URL로 airflow는 도메인 또는
# 사용 중인 cname이 무엇인지 추측할 수 없습니다.이는 자동화 된 이메일에서 사용됩니다.
# airflow는 링크를 올바른 웹 서버로 지정하는 데 도움이됩니다.
base_url = http://localhost:8080

# UI에서 모든 날짜를 표시하는 데 사용되는 기본 시간대입니다. UTC, system 또는
# IANA 시간대 문자열 (예 : Europe/Amsterdam). 비워 두면
# core/default_timezone의 기본값이 사용됩니다
# 예 : default_ui_timezone = America/New_York
default_ui_timezone = UTC

# 웹 서버를 시작할 때 지정된 IP
web_server_host = 0.0.0.0

# 웹 서버를 실행할 포트
web_server_port = 8080

# 웹 서버의 SSL 인증서 및 키의 경로. 두 개를 모두 제공하면 SSL이 활성화됩니다. 웹 서버 포트가 변경되지 않습니다.
web_server_ssl_cert =

# 웹 서버의 SSL 인증서 및 키의 경로. 두 개를 모두 제공하면 SSL이 활성화됩니다. 웹 서버 포트가 변경되지 않습니다.
web_server_ssl_key =

# Web 세션 데이터를 저장하는 데 사용되는 백엔드 유형. 'database' 또는 'securecookie'가 될 수 있습니다.
# 예 : session_backend = securecookie
session_backend = database

# 웹 서버가 응답하지 않는 gunicorn 마스터를 종료하기 전에 기다리는 시간 (초)
web_server_master_timeout = 120

# 워커가 응답하기 전에 gunicorn 웹 서버가 대기하는 시간 (초)
web_server_worker_timeout = 120

# 한 번에 새로 고칠 워커의 수입니다. 0으로 설정하면 worker 새로 고침이
# 비활성화됩니다. 0이 아닌 경우 airflow는 주기적으로 webserver worker를 새로 고쳐
# 새로운 worker를 기동하고 이전 worker를 종료합니다.
worker_refresh_batch_size = 1

# 워커 일괄 새로 고침 전에 대기할 시간 (초)
worker_refresh_interval = 6000

# True로 설정하면 Airflow는 plugins_folder 디렉토리의 파일을 추적합니다. 변경 사항이 감지되면
# gunicorn을 다시로드합니다.
reload_on_plugin_change = False

# Flask 앱을 실행하는 데 사용되는 비밀 키입니다. 가능한 한 무작위 여야합니다. 그러나
# webserver를 실행하는 데 사용되는 모든 머신에서 동일한 ``secret_key``를 사용하십시오.
# 그렇지 않으면 하나의 머신에서 "CSRF 세션 토큰이 없음"오류가 발생합니다.
# 웹 서버 키는 또한 로그가 검색 될 때 Celery 워커에게 요청을 승인하는 데 사용됩니다.
# 비밀 키를 사용한 토큰의 유효 시간은 짧지만 - 실행되는 모든 머신의 시간이 동기화되어 있는지 확인하십시오 (예 : ntpd를 사용)
# 그렇지 않으면 로그에 액세스 할 때 "금지됨"오류가 발생할 수 있습니다.
secret_key = CxzlypgGveoM2Ygt6Je1aA==

# Gunicorn 웹 서버를 실행하는 데 사용되는 워커 수
workers = 4

# Gunicorn이 사용해야하는 worker 클래스입니다.
# sync (기본값), eventlet, gevent와 같은 선택 사항이 있습니다. gevent를 사용할 때
# 가능하면 "1"로 _AIRFLOW_PATCH_GEVENT 환경 변수를 설정하여 가능한 한 빨리 gevent 패치가되도록합니다.
worker_class = sync

# Gunicorn 웹 서버의 로그 파일. '-'는 stderr로 로그를 남깁니다.
access_logfile = -

# Gunicorn 웹 서버의 로그 파일. '-'는 stderr로 로그를 남깁니다.
error_logfile = -


# Gunicorn 웹 서버의 액세스 로그 형식입니다.
# 기본 형식은 %%(h)s %%(l)s %%(u)s %%(t)s "%%(r)s" %%(s)s %%(b)s "%%(f)s" "%%(a)s"입니다.
# 문서 - https://docs.gunicorn.org/en/stable/settings.html#access-log-format
access_logformat =

# 웹 서버에서 설정 파일을 노출합니다. "non-sensitive-only"로 설정하면 모든 값을 표시합니다
# 보안에 영향을주는 것을 제외한. "True"는 모든 값을 표시합니다. "False"는
# 구성을 완전히 숨깁니다.
expose_config = False

# 웹 서버에서 호스트 이름을 노출합니다.
expose_hostname = True

# 웹 서버에서 스택 추적을 노출합니다.
expose_stacktrace = False

# 기본 DAG보기. 유효한 값은: ``grid``, ``graph``, ``duration``, ``gantt``, ``landing_times``
dag_default_view = grid

# 기본 DAG 방향. 유효한 값은:
# ``LR`` (왼쪽->오른쪽), ``TB`` (상단->하단), ``RL`` (오른쪽->왼쪽), ``BT`` (하단->상단)
dag_orientation = LR

# 초기 핸드 셰이크를 가져 오는 동안 웹 서버가 기다리는 시간 (초)
# 다른 워커 기계에서 로그를 가져올 때
log_fetch_timeout_sec = 5

# 다음 로그 가져 오기 전에 대기할 시간 (초).
log_fetch_delay_sec = 2

# 자동 tailing을 사용하려면 페이지 하단에서의 거리.
log_auto_tailing_offset = 30

# 자동 tailing 로그 표시의 애니메이션 속도.
log_animation_speed = 1000

# 기본적으로 웹에서 일시 중지된 DAG를 표시합니다. 이것을 숨기려면 뒤집으세요.
# 기본 DAG를 숨기려면 hide_paused_dags_by_default를 True로 설정하십시오.
hide_paused_dags_by_default = False

# UI의 모든 목록보기에서 일관된 페이지 크기
page_size = 100

# 탐색 표의 색상 정의
navbar_color = #fff

# UI에 표시 할 기본 DAG 실행
default_dag_run_display_number = 25

# Reverse proxy에 대한 werkzeug ``ProxyFix`` 미들웨어를 활성화
enable_proxy_fix = False

# ``X-Forwarded-For``에 대한 신뢰할 값을 지정합니다.
# 자세한 내용은 https://werkzeug.palletsprojects.com/en/0.16.x/middleware/proxy_fix/를 참조하십시오.
proxy_fix_x_for = 1

# ``X-Forwarded-Proto``에 대한 신뢰할 값을 지정합니다.
proxy_fix_x_proto = 1

# ``X-Forwarded-Host``에 대한 신뢰할 값을 지정합니다.
proxy_fix_x_host = 1

# ``X-Forwarded-Port``에 대한 신뢰할 값을 지정합니다.
proxy_fix_x_port = 1

# ``X-Forwarded-Prefix``에 대한 신뢰할 값을 지정합니다.
proxy_fix_x_prefix = 1

# 세션 쿠키에 안전한 플래그 설정
cookie_secure = False

# 세션 쿠키에 samesite 정책 설정
cookie_samesite = Lax

# DAG 코드 및 작업 인스턴스 로그보기에서 wrap 토글의 기본 설정.
default_wrap = False

# UI를 프레임에서 렌더링 할 수 있도록 허용
x_frame_enabled = True

# 익명 사용자 활동을 분석 도구로 전송
# google_analytics, segment 또는 metarouter 중에서 선택
# analytics_tool =

# 분석 도구의 계정 고유 ID
# analytics_id =

# "최근 작업" 통계는 설정한 경우 오래 된 DagRun에 대해 표시됩니다.
show_recent_stats_for_completed_runs = True

# 웹 서버 시작시 FAB 권한 및 보안 관리자 역할 동기화
update_fab_perms = True

# UI 쿠키 수명 (분). 사용자는 비활동 시
# ``session_lifetime_minutes`` 후에 UI에서 로그 아웃됩니다
session_lifetime_minutes = 43200

# DAG 개요 페이지 및 모든 페이지의 사이트 제목을위한 사용자 정의 페이지 제목 설정
# instance_name =

# DAG 개요 페이지의 사용자 정의 페이지 제목이 마크 업 언어를 포함하는지 여부
instance_name_has_markup = False

# 자동 새로 고침이 켜져 있을 때 그래프 또는 그리드 보기에서 DAG 데이터가 자동으로 새로 고침되는 데
# 걸리는 시간 (초)
auto_refresh_interval = 3

# 공개적으로 볼 수있는 배포에 대한 경고를 표시하는 부울
warn_deployment_exposure = True

# DAG 감사보기에서 제외 할 보기 이벤트의 쉼표로 구분 된 문자열.
# 여기에 전달 된 사항 이외의 다른 이벤트가 추가됩니다.
# 데이터베이스의 감사 로그에는이 매개 변수가 영향을주지 않습니다.
# 예를 들어 다음과 같습니다. audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data
audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data

# DAG 감사보기에 포함 할 보기 이벤트의 쉼표로 구분 된 문자열.
# 전달 된 경우이 이벤트 만 DAG 감사보기를 채 웁니다.
# 데이터베이스의 감사 로그에는이 매개 변수가 영향을주지 않습니다.
# 예: audit_view_included_events = dagrun_cleared,failed
# audit_view_included_events =


[email]

# 전자 메일 백엔드 구성 및
# 재시도 또는 실패시 전자 메일 알림을
# 보낼지 여부
# 사용할 전자 메일 백엔드
email_backend = airflow.utils.email.send_email_smtp

# 사용할 전자 메일 연결
email_conn_id = smtp_default

# 작업이 재시도 될 때 전자 메일 알림을 보낼지 여부
default_email_on_retry = True

# 작업 실패시 전자 메일 알림을 보낼지 여부
default_email_on_failure = True

# 전자 메일 주제에 사용될 템플릿으로 사용 될 파일 (Jinja2를 사용하여 렌더링 됨).
# 설정되지 않은 경우 Airflow는 기본 템플릿을 사용합니다.
# 예: subject_template = /path/to/my_subject_template_file
# subject_template =

# 전자 메일 내용에 사용될 템플릿으로 사용 될 파일 (Jinja2를 사용하여 렌더링 됨).
# 설정되지 않은 경우 Airflow는 기본 템플릿을 사용합니다.
# 예: html_content_template = /path/to/my_html_content_template_file
# html_content_template =

# 발신자 주소로 사용될 전자 메일 주소입니다.
# 이것은 원시 이메일이나 형식 "Sender Name <sender@email.com>"의 완전한 주소 일 수 있습니다.
# 예: from_email = Airflow <airflow@example.com>
# from_email =

[smtp]

# Airflow가 재시도, 실패시 이메일을 보내려면
# airflow.utils.email.send_email_smtp 함수를 구성하려면
# 여기에 smtp 서버를 구성해야합니다
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# 예: smtp_user = airflow
# smtp_user =
# 예: smtp_password = airflow
# smtp_password =
smtp_port = 25
smtp_mail_from = airflow@example.com
smtp_timeout = 30
smtp_retry_limit = 5

[sentry]

# Sentry (https://docs.sentry.io) 통합. 여기에는
# Python 플랫폼을 기반으로하는 추가 구성 옵션을 제공 할 수 있습니다. 참조:
https://docs.sentry.io/error-reporting/configuration/?platform=python.
# 지원되지 않는 옵션 : "통합", "in_app_include", "in_app_exclude",
# "ignore_errors", "before_breadcrumb", "transport"。
# Sentry에 오류 보고를 사용하려면
sentry_on = false
sentry_dsn =

[local_kubernetes_executor]

# 이 섹션은 위의 ``[core]`` 섹션에서
# ``LocalKubernetesExecutor``를 사용하는 경우에만 적용됩니다
# ``KubernetesExecutor``로 작업을 보낼 때 정의됩니다.
# 작업의 대기열 값이 ``kubernetes_queue`` (기본값 ``kubernetes``)인 경우
# 작업은 ``KubernetesExecutor``를 통해 실행되고,
# 그렇지 않으면 ``LocalExecutor``를 통해 실행됩니다.
kubernetes_queue = kubernetes

[celery_kubernetes_executor]

# 이 섹션은 위의 ``[core]`` 섹션에서
# ``CeleryKubernetesExecutor``를 사용하는 경우에만 적용됩니다
# ``KubernetesExecutor``로 작업을 보낼 때 정의됩니다.
# 작업의 대기열 값이 ``kubernetes_queue`` (기본값 ``kubernetes``)인 경우
# 작업은 ``KubernetesExecutor``를 통해 실행되고,
# 그렇지 않으면 ``CeleryExecutor``를 통해 실행됩니다.
kubernetes_queue = kubernetes

[celery]

# 이 섹션은 위의 ``[core]`` 섹션에서 CeleryExecutor를
# 사용하는 경우에만 해당됩니다
# Celery에서 사용 될 앱 이름
celery_app_name = airflow.executors.celery_executor

# ``airflow celery worker`` 명령으로 worker를 시작할 때 사용 될 동시성
# 이는 worker가 가져올 작업 인스턴스의 수를 정의하므로
# 작업의 리소스를 기준으로 worker를 크게 설정하고
# 작업의 성격에 맞게 크기를 맞추십시오
worker_concurrency = 16

# CeleryExecutor를 사용하여 worker를 시작할 때 사용되는 최대 및 최소 동시성
# 항상 최소 프로세스를 유지하지만 필요한 경우 최대로 늘어납니다. 값이
# max_concurrency,min_concurrency
# 자동 스케일 옵션이 사용 가능한 경우 worker_concurrency는 무시됩니다.
http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale
# 예: worker_autoscale = 16,12
# worker_autoscale =

# 성능을 향상시키는 데 사용되는 작업을 미리 가져 오는 데 사용됩니다.
# worker_prefetch_multiplier를 사용하면 성능이 향상될 수 있습니다.
# 프로세스 수에 worker_prefetch_multiplier를 곱한 것이 됩니다.
# worker에 의해 미리 가져 올 작업의 수입니다.
# 1보다 큰 값은 여러 worker가 미리 가져 올 작업을 가져 와서
# 기존 작업이 길게 실행되는 동안 차단 될 수 있음을 의미합니다.
# 다른 worker에 의해 이미 가져 간 차단 된 작업
https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
worker_prefetch_multiplier = 1

# 원격 제어가 활성화되어 있는지 여부를 지정합니다.
# 브로커로 Amazon SQS를 사용하는 경우 Celery는 많은 ``.*reply-celery-pidbox`` 큐를 만듭니다.
# false로 설정하면 이를 방지할 수 있습니다. 그러나 이를 비활성화하면 Flower가 작동하지 않습니다.
worker_enable_remote_control = true

# Celery 브로커 URL입니다. Celery는 RabbitMQ, Redis를 지원하며 실험적으로
# sqlalchemy 데이터베이스. 자세한 내용은 Celery 설명서를 참조하십시오.
broker_url = redis://redis:6379/0

# Celery result_backend입니다. 작업이 완료되면 메타 데이터를 업데이트해야합니다.
# 작업의 상태는 이를 사용하여 스케줄러가 작업의 상태를 업데이트합니다.
# 데이터베이스의 사용을 강력히 권장합니다.
# 지정되지 않은 경우 sql_alchemy_conn 및 db+ scheme 접두사가 사용됩니다
http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
# 예: result_backend = db+postgresql://postgres:airflow@postgres/airflow
# result_backend =

# Celery Flower는 Celery를위한 멋진 UI입니다. Airflow에는 시작하는 바로 가기가 있습니다.
# it ``airflow celery flower``. 이는 Celery Flower가 실행되는 IP를 정의합니다
flower_host = 0.0.0.0

# Flower의 루트 URL
# 예: flower_url_prefix = /flower
flower_url_prefix =

# Celery Flower가 실행되는 포트를 정의합니다.
flower_port = 5555

# Basic 인증을 사용하여 Flower를 보안하는 방법
# 쉼표로 구분 된 user:password 쌍을 허용합니다.
# 예: flower_basic_auth = user1:password1,user2:password2
flower_basic_auth =

# CeleryExecutor가 작업 상태를 동기화하는 데 사용하는 프로세스 수입니다.
# 0은 max(1, 코어 수 - 1) 프로세스를 사용하도록합니다.
sync_parallelism = 0

# Celery 구성 옵션을 가져 오는 데 사용되는 가져 오기 경로
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =

# Celery Pool 구현.
# 선택 사항으로 ``prefork`` (기본값), ``eventlet``, ``gevent`` 또는 ``solo``.
# 참조 :
https://docs.celeryproject.org/en/latest/userguide/workers.html#concurrency
https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html
pool = prefork


# Celery 작업이 ``send_task_to_executor`` 또는
# ``fetch_celery_task_state`` 작업이 만료되기 전에 대기 할 초 단위 수입니다.
operation_timeout = 1.0

# Celery 작업은 작업이 worker에 의해 실행될 때 상태를 'started'로보고합니다.
# 이는 Airflow에서 실행 중인 작업을 추적하고 Scheduler가 다시 시작되면
# 또는 HA 모드에서 실행되면 이전 SchedulerJob에서 시작한 고아 작업을 채택 할 수 있습니다.
task_track_started = True

# 채택 된 작업이 큐에 대기하고 있으면 몇 초 후에 자동으로 소멸 될 것으로 예상하는 시간,
# 자동으로 예약됩니다. 이 설정은 ``stalled_task_timeout``과 똑같은 일을하지만
# 특정하게 채택된 작업에만 적용됩니다. 0으로 설정하면 ``stalled_task_timeout`` 설정도
# 채택 된 작업에 적용됩니다.
task_adoption_timeout = 600

# 큐에 대기하는 작업이 멈춘 것으로 가정되어 자동으로 예약 될 시간 (초).
# 대신 채택 된 작업은 지정된 경우 ``task_adoption_timeout`` 설정을 사용합니다.
# 0으로 설정하면 작업이 멈추는 것을 자동으로 지우는 것이 비활성화됩니다.
stalled_task_timeout = 0

# ``AirflowTaskTimeout`` 오류로 실패할 때 중개 브로커에 작업 메시지를 게시하기 위한
# 최대 재시도 횟수. 작업을 실패로 표시하고 포기하기 전에
# task_publish_max_retries보다 큰 작업을 수행합니다.
task_publish_max_retries = 3

# Worker 초기화 검사를 통해 메타 데이터 데이터베이스 연결을 유효성 검사 할 것인지 여부
worker_precheck = False

[celery_broker_transport_options]

# 이 섹션은 기본 celery 브로커 전송에 전달할 수있는 옵션을 지정하는 데 사용됩니다.
http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-broker_transport_options
# visibility_timeout는 작업이 다른 worker로 메시지를 다시 전달하기 전에
# worker에서 작업을 확인하는 시간 (초)을 정의합니다.
# 가장 긴 ETA의 시간과 일치하도록 가시성 시간을 늘리십시오.
# Redis 및 SQS celery 브로커 전용입니다.
http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
# 예 : visibility_timeout = 21600
# visibility_timeout =

[dask]

# 이 섹션은 위의 [core] 섹션에서 DaskExecutor를 사용하는 경우에만 해당됩니다.
# Dask 클러스터의 스케줄러의 IP 주소 및 포트입니다.
cluster_address = 127.0.0.1:8786

# 보안 된 Dask 스케줄러에 액세스하는 데 사용되는 TLS/ SSL 설정입니다.
tls_ca =
tls_cert =
tls_key =

[scheduler]
# 작업 인스턴스는 외부 종료 신호를 듣습니다 (CLI 또는 UI에서 작업을 지울 때),
# 이는 그들이 얼마나 자주 청취해야하는지를 정의합니다 (초 단위).
job_heartbeat_sec = 5

# 스케줄러는 계속해서 새 작업을 트리거하려고 시도합니다 (더 자세한 정보는
# docs의 scheduler 섹션을 참조하십시오). 이것은 정의합니다
# 스케줄러가 얼마나 자주 실행되어야하는지 (초 단위).
scheduler_heartbeat_sec = 5

# 각 DAG 파일을 예약하려고 시도하는 횟수입니다.
# -1은 무제한 횟수를 나타냅니다.
num_runs = -1

# 스케줄러가 루프 간격에서 잠들어 있을 때 제어하지만,
# 루프에서 할 일이 없으면 즉시 다음 루프를 시작합니다.
scheduler_idle_sleep_time = 1

# DAG 파일이 구문 분석 될 초 단위 수입니다. DAG 파일은 매번
# ``min_file_process_interval`` 초 간격으로 구문 분석됩니다. DAG에 대한 업데이트는 이후에
# 이 간격. 이 숫자를 낮게 유지하면 CPU 사용량이 증가합니다.
min_file_process_interval = 30

# 얼마나 자주 (초) 불필요한 DAG를 비활성화하려면 확인할지 여부를 정의합니다.
# 예상 파일에서 더 이상 존재하지 않는 DAG입니다) 및 더 이상
# 참조되지 않은 데이터 집합 및 고아로 표시되어야합니다.
parsing_cleanup_interval = 60

# 얼마나 자주 (초) DAG 디렉토리에서 새 파일을 스캔 할지 정의합니다. 기본값은 5 분입니다.
dag_dir_list_interval = 300

# 로그에 통계를 인쇄하는 데 얼마나 자주 (초) 사용할지 정의합니다.
# 0으로 설정하면 통계를 인쇄하지 않습니다.
print_stats_interval = 30

# 풀 사용량 통계를 StatsD에 (statsd_on이 활성화 된 경우) 보낼 얼마나 자주 (초) 사용할지 정의합니다.
pool_metrics_interval = 5.0

# 마지막 스케줄러 심장이 스케줄러_health_check_threshold보다
# (초) 전에 발생 한 경우 스케줄러가 건강하지 않은 것으로 간주됩니다.
# 이것은 "/health" 엔드 포인트의 건강 검사에서 사용됩니다.
scheduler_health_check_threshold = 30

# 스케줄러를 시작하면 airflow는 작은 웹 서버를 시작하여
# 이게 True로 설정되면 subprocess에 건강 확인을 제공합니다
enable_health_check = False

# 스케줄러를 시작하면 airflow는 작은 웹 서버를 시작하여
# 이 포트에서 건강 검사를 제공합니다.
scheduler_health_check_server_port = 8974

# 얼마나 자주 (초) 스케줄러가 고아 작업 및 SchedulerJobs를 확인해야하는지 정의합니다.
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /opt/airflow/logs/scheduler

# 로컬 작업 작업은 주기적으로 DB에 심장을 뛰어야합니다. 작업이
# 이 많은 시간 동안 뛰지 않은 경우 스케줄러는 관련된 작업 인스턴스를 실패로 표시하고
# 작업을 다시 예약합니다.
scheduler_zombie_task_threshold = 300

# 얼마나 자주 (초) 스케줄러가 좀비 작업을 확인해야하는지 정의합니다.
zombie_detection_interval = 10.0

#이 값을 False로 설정하여 스케줄러 캐치업을 비활성화합니다.
# 기본 동작은 변경되지 않으며
# 명령 줄 백필이 여전히 작동하지만 스케줄러
# 이 값이 False이면 스케줄러는 스케줄러 캐치업을하지 않습니다.
# 그러나 DAG 정의에서 DAG 단위로 설정할 수 있습니다
# (catchup)
catchup_by_default = True

# 이 값을 True로 설정하면 작업의 첫 번째 작업 인스턴스
# depends_on_past 설정을 무시합니다. 작업 인스턴스는 고려됩니다.
# 새로 추가 된 작업이 예약되려면 여기에 지정된 실행 날짜가 없습니다.
# 예를 들어 실행 일 이전의 작업 인스턴스가 없습니다.
# 새로 추가 된 작업을 예약하려면 수동으로 표시 할 필요가 없습니다.
ignore_first_depends_on_past_by_default = True

# 이는 스케줄링 메인 루프의 쿼리 배치 크기를 변경합니다.
# 이것이 너무 높으면 SQL 쿼리 성능이 쿼리 예측문의 복잡성 및/또는 과도한 잠금에 영향을 받을 수 있습니다.
# 게다가 데이터베이스에 대한 최대 허용 쿼리 길이에 도달 할 수 있습니다.
# 이를 위해 제한을 없애려면 0으로 설정하십시오 (권장되지 않음)
max_tis_per_query = 512


# 스케줄러가 관련 쿼리에서 ``SELECT ... FOR UPDATE``를 발행해야합니까.
# 이 값이 False로 설정되면 한 번에 하나 이상의 스케줄러를 실행하면 안됩니다.
use_row_level_locking = True

# 스케줄러 루프 당 만들어질 DAGRuns의 최대 수입니다.
max_dagruns_to_create_per_loop = 10

# 작업을 예약하고 큐잉 할 때 스케줄러가 검토 할 (잠금 할) DagRuns의 수입니다.
max_dagruns_per_loop_to_schedule = 20

# 작업의 "미니 스케줄러"를 수행하여 동일한 DAG의 더 많은 작업을 예약하도록
# 작업 감독 프로세스를 사용해야합니까? 여기에 둔다면 같은 DAG의 작업은 더 빨리 실행되지만,
# 어떤 상황에서는 다른 DAG를 허기킬 수 있습니다.
schedule_after_task_execution = True

# DAG를 구문 분석하는 데 병렬로 실행 할 수있는 스케줄러 프로세스의 수를 정의합니다.
# 이는 몇 개의 프로세스가 실행될 것인지 정의합니다.
parsing_processes = 2

# ``modified_time``, ``random_seeded_by_host`` 및 ``alphabetical`` 중 하나입니다.
# 스케줄러는 DAG 파일을 나열하고 정렬하여 구문 분석 순서를 결정합니다.
#
# * ``modified_time``: 파일의 수정 시간으로 정렬합니다. 이것은 대규모로 사용됩니다.
#   최근 수정된 DAG를 먼저 구문 분석합니다.
# * ``random_seeded_by_host``: 여러 스케줄러 간에 무작위로 정렬하지만 동일한 순서로 정렬합니다.
#   동일한 호스트에서. 각 스케줄러가 다르게 구문 분석 할 수있는 HA 모드에서 유용합니다.
# * ``alphabetical``: 파일 이름으로 정렬
file_parsing_sort_mode = modified_time

# DAG 프로세서가 독립 프로세스로 실행되는지 아니면 스케줄러 작업의 서브 프로세스인지 여부
# 입니다.
standalone_dag_processor = False

# Only applicable if `[scheduler]standalone_dag_processor` is true and  callbacks are stored
# in database. Contains maximum number of callbacks that are fetched during a single loop.
max_callbacks_per_loop = 20

# Only applicable if `[scheduler]standalone_dag_processor` is true.
# Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
dag_stale_not_seen_duration = 600

# 스케줄러가 cron 간격을 사용하지 않도록하려면이 값을 False로 설정합니다.
# DAG가 웹 UI 또는 trigger_dag에서 수동으로 제출되면 여전히 실행됩니다.
use_job_schedule = True

# 미래의 실행 날짜에 대한 외부 트리거 된 DagRuns을 허용
# DAG에서 schedule_interval이 None으로 설정되어 있을 때만 효과가 있습니다.
allow_trigger_in_future = False

# 아직 실행되지 않은 만료 된 트리거 요청을 확인하는 데 얼마나 자주 확인 할지 정의합니다.
trigger_timeout_check_interval = 15

[triggerer]
# 기본적으로 하나의 Triggerer가 한 번에 실행 할 트리거 수입니다.
default_capacity = 1000

[kerberos]
ccache = /tmp/airflow_krb5_ccache

# fqdn과 함께 확장됩니다
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab

# 티켓을 전달할 수있게하는 것을 비활성화하려면 허용하십시오.
forwardable = True

# 토큰에서 소스 IP를 제거하는 것을 허용하십시오. 토큰을 사용할 때 유용합니다.
# NAT 된 Docker 호스트 뒤에서
include_ip = True

[elasticsearch]
# Elasticsearch 호스트
host =

# 로그의 주어진 작업 로그를 쿼리하는 데 사용되는 로그_id의 형식
log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}

# 작업의 로그 스트림을 표시하기위한 마크
end_of_log_mark = end_of_log

# 로그_id를 사용하여 쿼리하는 데 사용되는 elasticsearch 프론트 엔드의 정규 URL
# 위의 인수를 사용하여 log_id 템플릿에서 log_id를 구성합니다.
# 참고 : scheme은 제공되지 않은 경우 기본적으로 https로 설정됩니다.
# 예 : frontend = http://localhost:5601/app/kibana#/discover?_a=(columns:!(message),query:(language:kuery,query:'log_id: "{log_id}"'),sort:!(log.offset,asc))
frontend =

# 작업 로그를 worker의 stdout에 쓰지 않고 기본 파일에 쓰도록하려면
write_stdout = False

# 기본 로그 형식터 대신 로그 라인을 JSON으로 쓰려면
json_format = False

# JSON 출력이 활성화 된 경우 로그에 첨부 할 로그 필드
json_fields = asctime, filename, lineno, levelname, message

# 호스트 이름이 저장된 필드 (보통 ``host`` 또는 ``host.name``)입니다.
host_field = host

# offset이 저장된 필드 (보통 ``offset`` 또는 ``log.offset``)입니다.
offset_field = offset

[elasticsearch_configs]
use_ssl = False
verify_certs = True

[kubernetes_executor]
# KubernetesExecutor 워커의 기초가되는 YAML pod 파일의 경로입니다.
pod_template_file =

# Worker가 실행 할 Kubernetes Image의 저장소
worker_container_repository =

# Worker가 실행 할 Kubernetes Image의 태그
worker_container_tag =

# airflow workers가 생성 될 Kubernetes 네임 스페이스입니다. 기본값은 ``default``입니다.
namespace = default

# True 인 경우 종료시 모든 worker pod가 삭제됩니다.
delete_worker_pods = True

# False 인 경우 (delete_worker_pods가 True 인 경우),
# 실패한 worker pod는 삭제되지 않으므로 사용자가 조사 할 수 있습니다.
# 작업 자체가 실패 한 경우에만 worker pod의 제거를 방지합니다.
# 작업이 실패한 경우에만 worker pod의 제거를 방지합니다.
delete_worker_pods_on_failure = False

# 스케줄러 루프 당 Kubernetes Worker Pod 생성 호출 수입니다.
# 현재 기본값 "1"은 heartbeat당 하나의 pod 만 시작합니다.
# 사용자는 성능을 높이기 위해이 번호를 증가시킬 것을 강력히 권장합니다.
worker_pods_creation_batch_size = 1

# 사용자가 여러 네임 스페이스에서 pod를 시작하도록 허용합니다.
# 스케줄러를위한 클러스터 역할을 만들어야합니다.
multi_namespace_mode = False

# 쿠버네티스에서 실행중인 pod에서 연결하기위해 kubernetes가 pod에 제공하는 서비스 계정을 사용합니다.
# 쿠버네티스에서 실행 중인 pod에서 실행되고있는 프로세스에서 호출되면 예외가 발생합니다.
in_cluster = True

# in_cluster=False로 실행중인 경우 기본적인 cluster_context 또는 config_file을 변경하려면
# 옵션을 Kubernetes 클라이언트에 전달하십시오. ``kubectl``과 같은 기본 동작을 사용합니다.
# cluster_context =
#
# # in_cluster=False로 실행중인 경우 기본적인 cluster_context 또는 config_file 옵션을 변경하려면
# # Kubernetes Executor에 제공됩니다. 지정하지 않으면 기본 동작을 사용합니다.
# config_file =
#
# # Kubernetes 클라이언트 core_v1_api 메서드를 호출하는 동안 전달 할 키워드 매개 변수
# # Kubernetes Executor에서 제공 된 단일 줄 형식 JSON 사전 문자열로 제공됩니다.
# # 지원되는 매개 변수 목록은 모든 core_v1_apis에 대해 유사하므로
# # 모든 api에 대한 단일 구성 변수입니다. 참조 :
# # https://raw.githubusercontent.com/kubernetes-client/python/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/api/core_v1_api.py
# kube_client_request_args =
#
# # 사용자가 Kubernetes Executor를 사용할 때 'delete_namespaced_pod' kubernetes client에서 전달하는
# # ``core_v1_api`` 메서드의 ``v1DeleteOptions`` 클래스에 정의된 옵션 중 일부를 포함 할 수 있습니다.
# # 여기에서 찾을 수 있습니다.
# # https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
# # 예 : delete_option_kwargs = {"grace_period_seconds": 10}
# delete_option_kwargs =
#
# # TCP keepalive 메커니즘을 활성화합니다. 이는 Kubernetes API 요청이 무기한 대기하는 것을 방지합니다.
# # 불안전한 연결이 시간 초과 된 서비스 (예 : 클라우드 로드 밸런서 또는 방화벽)에 연결되어 있을 때
# # 해당 옵션이 활성화되어 있으면 무기한 대기하는 것을 방지합니다.
# enable_tcp_keepalive = True
#
# # 'enable_tcp_keepalive' 옵션이 활성화 된 경우 TCP가 유휴 상태에 머무르는 시간 (초)입니다.
# tcp_keep_idle = 120
#
# # 'enable_tcp_keepalive' 옵션이 활성화 된 경우 Kubernetes API가 keepalive 프로브에 응답하지 않으면
# # TCP는 'tcp_keep_intvl' 초 후에 프로브를 다시 전송합니다.
# tcp_keep_intvl = 30
#
# # 'enable_tcp_keepalive' 옵션이 활성화 된 경우 Kubernetes API가 keepalive 프로브에 응답하지 않으면
# # 연결이 중단 될 때까지 'tcp_keep_cnt 번'의 프로브를 TCP가 다시 전송합니다.
# tcp_keep_cnt = 6
#
# # SSL 증명서의 Kubernetes python 클라이언트를 확인할지 여부를 결정하는
# # false로 설정하면 Kubernetes Executor에서 SSL 증명서를 확인하지 않습니다.
# verify_ssl = True
#
# # worker가 대기 상태로 있을 수있는 시간 (초)
worker_pods_pending_timeout = 300
#
# # 기다리는 worker가 시간 초과를 초과했는지 확인하는 빈도 (초)
worker_pods_pending_timeout_check_interval = 120
#
# # 큐에 있고 pod가없는 상태에서 대기 중인 작업 인스턴스를 확인하는 빈도 (초)
worker_pods_queued_check_interval = 60
#
# # 각 확인 간격에서 시간 초과 위반을 확인할 대기 중인 pod 수입니다.
# ``multi_namespace_mode``를 사용하는 경우이 값을 더 높게 설정 할 수 있습니다.
worker_pods_pending_timeout_batch_size = 100

[sensors]
# Sensor의 기본 시간 제한은 기본적으로 7 일입니다 (7 * 24 * 60 * 60).
default_timeout = 604800

# CeleryExecutor와 Redis, PostgreSQL을 사용하여 기본적인 Airflow 클러스터 구성입니다.
# 이 구성은 환경 변수 또는 .env 파일을 사용한 기본 구성을 지원합니다.
# 다음 변수가 지원됩니다:
#
# AIRFLOW_IMAGE_NAME           - Airflow를 실행하는 데 사용되는 Docker 이미지 이름.
#                                기본값: apache/airflow:2.5.1
# AIRFLOW_UID                  - Airflow 컨테이너의 사용자 ID
#                                기본값: 50000
# AIRFLOW_PROJ_DIR             - 모든 파일이 볼륨을 설정할 기본 경로.
#                                기본값: .
# 이러한 구성은 대부분의 경우 독립적인 테스트/시험 모드에서 Airflow를 실행할 때 유용합니다.
#
# _AIRFLOW_WWW_USER_USERNAME   - 관리자 계정의 사용자 이름 (요청시).
#                                기본값: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - 관리자 계정의 비밀번호 (요청시).
#                                기본값: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - 모든 컨테이너를 시작할 때 추가할 추가 PIP 요구 사항.
#                                기본값: ''

---
version: '3'
x-airflow-common:
  &airflow-common

#requirements.txt 파일로 사용자 지정 종속성을 추가하거나 제공자 패키지를 업그레이드하려면

# Dockerfile을 추가해 확장 이미지를 사용할 수 있습니다.
# 'image: ${AIR~' 부분을  주석 처리 하고 docker-compose.yaml 파일이 있는 디렉토리에 Dockerfile을 생성
# 그런 다음 아래 "build" 라인의 주석 처리를 해제, 이후 이미지를 빌드하려면 `docker-compose build`를 실행

# 이미지를 설정하는 부분으로, AIRFLOW_IMAGE_NAME이 설정되어 있으면 해당 값을 사용하고, 
      # 설정되어 있지 않으면 기본값인 apache/airflow:2.5.1을 사용
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
    # build: .
  environment:
    &airflow-common-env
    #Airflow의 실행 모드(Executor)를 설정하는 부분입니다. CeleryExecutor는 Airflow이 작업을 분산하고
    #여러 워커에서 병렬로 실행하기 위해 Celery를 사용하는 실행 모드
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
        
    #Airflow이 데이터베이스에 연결하기 위한 SQLAlchemy 연결 문자열
    #PostgreSQL 데이터베이스를 사용하며, psycopg2라는 Python 드라이버를 통해 연결
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
                
    # Airflow 2.3 이전의 버전과의 하위 호환성을 위한 것으로, Airflow의 코어 모듈이 
    # 데이터베이스에 연결하는 데 사용하는 SQLAlchemy 연결 문자열
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    
    # CeleryExecutor를 사용할 때 Celery 작업 결과를 저장하는 백엔드를 지정
    # CeleryExecutor를 사용하는 경우 작업의 실행 및 결과 추적
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    
    # Redis 서버가 redis로 호스트되고 포트 6379를 사용하며, 브로커 데이터베이스는 0으로 설정
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    
    # Fernet은 암호화와 관련된 토큰 생성 및 검증을 담당하는 대칭 키 알고리즘
    AIRFLOW__CORE__FERNET_KEY: ''
    
    # DAG가 생성될 때 자동으로 일시 중지되는지 여부
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
        
    # 예제 dag들을 실행할지 여부
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
        
# Airflow API의 인증 백엔드를 지정합니다. 여기서는 기본적으로, basic_auth 및 session
# basic_auth : HTTP 기반의 사용자 이름 및 비밀번호를 사용한 인증/session: 세션을 사용하여 사용자를 인증
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    
    # 추가적으로 설치해야 하는 Python 패키지의 목록
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
        
     # 볼륨으로 폴더들을 매핑  
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
        
      # AIRFLOW_UID의 값을 사용하며, 해당 값이 설정되어 있지 않으면 기본값으로 50000을 사용  
  user: "${AIRFLOW_UID:-50000}:0"
    # redis,postgres가 healthy할때 컨테이너 시작
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
        # PostgreSQL 서버가 준비 상태인지 확인합니다. -U airflow는 airflow 사용자로 접속하는 것을 의미
      test: ["CMD", "pg_isready", "-U", "airflow"]
        # 건강 상태를 확인하는 주기를 5초로 설정
      interval: 5s
        # 실패할 경우 최대 5번 재시도
      retries: 5
        # 서비스가 종료될 경우 자동으로 재시작하도록 설정
    restart: always
        # redis 서비스는 Redis 이미지를 사용하여 6379번 포트로 노출하며, 주기적으로 
        # redis-cli ping 명령을 사용하여 건강 상태를 확인하고, 문제가 발생할 경우 자동으로 재시작
  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

    # Airflow 웹 서버를 8080 포트로 노출하고 주기적으로 건강 상태를 확인하며, 
    #문제가 발생할 경우 자동으로 재시작합니다. 또한, airflow-webserver 서비스는 
    #airflow-init 서비스가 성공적으로 실행된 후에만 시작됨
  airflow-webserver:
    # 앵커에서 정의된 환경 변수를 가져옴
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
        # 서버가 오류 응답을 반환하면 curl 명령이 실패하도록 만듬
        # 건강 상태를 확인하기 위한 Airflow URL
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
        # 스케줄러 작업의 건강 상태를 확인하는 Airflow 명령입니다. $${HOSTNAME}는 환경 변수를 나타냅니다. 
        # 스케줄러 작업이 정상적으로 수행되고 있는지 확인하는 용도로 사용됨
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        # 명령이 쉘에서 실행되어야 함
        - "CMD-SHELL"
        # Celery Worker가 실행 중이고 정상적으로 동작하는지 확인하는 Celery 명령입니다. 
        # Airflow의 작업들을 비동기적으로 처리하기 위해 Celery라는 분산 작업 큐 시스템을 이용
        # $${HOSTNAME}는 환경 변수를 나타냅니다. Celery Worker의 건강 상태를 확인하는데 사용됨
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
        # DUMB_INIT_SETSID를 0으로 설정하면, dumb-init이 프로세스를 세션 리더로 만들지 않습니다.
        # 세션 리더로 만들지 않는 것은 몇 가지 동작 관련 이슈를 방지하고자 하는이유는.
        # 안정성을 고려한 선택입니다. celery workers의 올바른 웜 셧다운을 처리하는데 도움됩니다.
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
        #CMD-SHELL 테스트 명령어로, Airflow의 작업(triggerer job)이 정상적으로 동작하는지 확인
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
        # 컨테이너가 시작될 때 Bash 셸을 실행
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        # 현재 Airflow 버전 확인
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        
        # 최소 Airflow 버전 확인
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        
        # 현재 버전이 최소 버전보다 낮으면 오류 출력
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        
        # AIRFLOW_UID가 설정되지 않았을 때 경고 출력
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        
        # 시스템 자원 확인 (메모리, CPU, 디스크)
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        
        # 메모리 최소사양보다 부족할 경우 경고 출력
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        # CPU 최소사양보다 부족할 경우 경고 출력
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        # 디스크 최소사양보다 부족할 경우 경고 출력
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        
        # 시스템 자원 부족 시 경고 출력
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        
        # Airflow 폴더 및 파일 권한 설정
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        
        #Docker 컨테이너 내에서 실행되는 Airflow의 버전을 확인
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
        # db 업그레이드, 계정 생성후, id/pw 설정
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources

 

  # airflow-cli은  Apache Airflow의 커맨드 라인 인터페이스(Command Line Interface, CLI)를 나타냅니다. Airflow CLI를
  # 사용하면 Airflow에서 제공하는 다양한 기능을 명령어를 통해 수행

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    # Bash 셸에서 문자열을 실행할 때 사용, Airflow CLI를 실행
    command:
      - bash
      - -c
      - airflow

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
    # flower는 Celery 작업 큐를 시각적으로 모니터링하고 관리하기 위한 도구
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
# Docker Compose 파일에서 볼륨을 설정하는 부분
volumes:
  postgres-db-volume:

FROM apache/airflow:2.5.1

 

USER root
RUN apt-get update \
  && apt-get install -y --no-install-recommends \
         vim \
  && apt-get autoremove -yqq --purge \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*
Dockerfile 내에서 패키지를 설치하고 이미지를 최적화
1. `apt-get update`: 패키지 목록을 업데이트합니다.
2. `apt-get install -y --no-install-recommends vim`: vim 패키지를 설치하고 권장 패키지를 설치하지 않도록 합니다.
3. `apt-get autoremove -yqq --purge`: 불필요한 패키지를 자동으로 제거합니다.
4. `apt-get clean`: apt의 캐시를 지웁니다.
5. `rm -rf /var/lib/apt/lists/*`: 패키지 목록과 관련된 임시 파일을 제거합니다.

USER airflow

# app으로 requirements.txt 복사
COPY requirements.txt /app/requirements.txt
RUN pip3 install -r /app/requirements.txt


그냥하면 되는걸 왜 복사하고 설치할까?

Docker는 각 명령어를 캐싱하여 이미지 빌드 속도를 높이려고 합니다. 만약 COPY requirements.txt /app/requirements.txt 를 먼저 수행하고, 그 후에 소스 코드의 변경이 없는 경우 Docker는 이전 단계를 캐시하고 pip3 install을 다시 실행하지 않습니다. 이로써 이미지 빌드가 더 빨라집니다.

만약 소스 코드가 변경되지 않았고, requirements.txt 파일도 변경되지 않았다면, Docker는 이전에 설치된 패키지를 그대로 사용할 수 있습니다. 따라서 COPY ~ 를 따로 두는 것은 이미지 빌드 성능을 최적화하는 한 방법입니다.


# DAG 파일 복사
COPY dags/ /opt/airflow/dags/

 

실리콘밸리 개발자 한기용님의 강의를 듣고 요약

 

 

+aws clound.자격증 추천할만함  

주니어 : 가능성을 중점으로 
시니어 : 경험이 더 중요, 레퍼런스 체크

 

데이터 직군 내에서 전환

  • 데이터 분석가, 데이터엔지니어, 데이터사이언티스트
  • 회사내에서 직무 전환이 수월하고 일반적

일반적인 인터뷰(면접) 프로세스

  • 이력서 작성
  • 면접 or 코딩테스트 
  • 레퍼런스체크
  • 협상

이력서

  • 자기소개(아주 짧게) : 지원자의 강점, 차별성. 회사와 지원하는 업무에 맞게, 경력자면 비지니스 임팩트(매출상승등)
  • 경력 : 최신순으로, 어떤 역할 결과를 수치로 기술
  • 기술/자격증 : 카테고리화 해서 눈에 들어오게, 협업툴(git,slack 등)
  • 첫페이지가 제일 중요. 흥미를 끌어야함
  • 스킬셋 한눈에 잘보이게 정리하기
  • 프로젝트 경험은 최대한 이해하기 쉽게
    예) google worksheet -Redshift API 구축
  • 많은 내용 보다는 간결하고 눈에 들어오게

행동양식 질문들

  • 협업,주인의식, 성과
  • 답변은 STAR 방식(상황,업무,액션,결과)으로 할것
  • 예) 팀 프로젝트를 하면서 팀원과 의견이 다를 경우 어떻게 해결했는지?, 본인 만의 학습방법이 있다면? , 
    가장 최근에 배운 새로운 지식이 있다면 무엇인지?, 우리 회사에 왜 관심이 있나요?, 지금까지 해본 일중에 재밌었던일?(뭐에 중점을 두는지 알 수 있다)

기술질문들

  • 코딩 테스트 (python,sql,pandas ..)
  • 특정 기술에 대한 질문(airflow, spark,...)
  • 이력서에 프로젝트를 보고 꼬리물고 질문
  • 작은 숙제를 주고 나중에 같이 리뷰하는 경우도 있음
  • 조심할 부분 : 바로 답을 하려고 하지 말고, 문제를 제대로 이해한건지 질문(문제정의를 잘하는 사람이라는 인상)
  • 효율적 방법이 생각나지 않는다면, 간단한 솔루션으로 일단 시작하고, 완성하고 질문
  • 생각이 나지 않는다면? 솔직하게 도와달라고 요청한다(소통능력, 간절함)
  • 테스트 케이스 만들 것
  • 나한테 설명하면서 풀어보기
  • 예) 두 개의 리스트에서 공통적으로 존재하는 값들을 출력하는 함수를 구현

질문을 역으로 하는 지원자

  • 미리 회사를 검색해서 무엇을 하는 곳인지 찾아볼 것
  • 면접관이 자기 소개를 할때 잘 듣고 그에 대해 짦게 질문
  • 흐름에 맞게 자연스러운 질문이 중요
  • 사용하는 기술스텍에 대한 질문
  • 내가 처음으로 하게될 일
  • 회사의 수익구조

취업전략

  1. 탐색 : 기술스텍으로 검색 해보는거도 좋음(지원 범위가 넓어짐)
  2. 이력서 : 무조건 많이내기, 조건 절반이상 충족시 지원할것
  3. 면접 : 무조건 많이 보기,
  4. 결정 : 너무 많은 준비보단 빠르게 시작하는게 좋을수 있음, 사람이 어떤지 볼 것, 성장성있는회사,

매니저 선택기준  : 배울점이 많은 사람인가?, 명확한 결정을 내리고 피드백을 잘 주는가?, 회사에서 힘이있는가?

좋은 매니저라면 : 매니저가는 회사 따라가기, 그사람이라면 어떻게 했을지 생각해볼것

 

커리어전환의 시기 : 지난 한달 , 일년동안 뭘 배웠는가?, 새로운 기회가 온다면?, 가진걸 버릴수 있는지?


airflowdbt란

  •  Apache Airflow와 dbt(Data Build Tool)를 함께 사용하여 데이터 웨어하우스에서 ETL(Extract, Transform, Load) 작업을 수행하기 위한 도구 또는 플러그인

 

mau(월간사용자정보) 코드

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta

from airflow import AirflowException

import requests
import logging
import psycopg2

from airflow.exceptions import AirflowException

# Redshift에 연결하는 함수
def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

# SQL을 실행하고 특정 테이블에 데이터를 적재하는 함수
def execSQL(**context):
    schema = context['params']['schema']
    table = context['params']['table']
    select_sql = context['params']['sql']

    # 로그에 정보 출력
    logging.info(schema)
    logging.info(table)
    logging.info(select_sql)

    # Redshift 연결 및 커서 생성
    cur = get_Redshift_connection()

    # 임시테이블에 내용 있으면 삭제, 임시 테이블 생성 후 CTAS 로 데이터 적재
    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};
              CREATE TABLE {schema}.temp_{table} AS {select_sql}"""
    cur.execute(sql)

    # 임시 테이블의 레코드 수 확인
    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]

    # 레코드가 없으면 에러 발생
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")

    try:
        # 기존 테이블 삭제 및 임시 테이블을 기존 테이블 이름으로 변경
        sql = f"""DROP TABLE IF EXISTS {schema}.{table};
                  ALTER TABLE {schema}.temp_{table} RENAME TO {table};
                  COMMIT;"""
        logging.info(sql)
        cur.execute(sql)

    except Exception as e:
        # 롤백 시도
        cur.execute("ROLLBACK")
        logging.error('Failed to execute SQL. Completed ROLLBACK!')
        # Airflow 예외 발생
        raise AirflowException("")

# DAG 정의
dag = DAG(
    dag_id="Build_Summary",
    start_date=datetime(2021, 12, 10),
    schedule='@once',
    catchup=False
)

# PythonOperator를 사용하여 execSQL 함수를 실행하는 태스크 정의
execsql = PythonOperator(
    task_id='mau_summary',
    python_callable=execSQL,
    params={
        'schema': 'keeyong',
        'table': 'mau_summary',

# mau : 사용자가 해당 월에 방문했을때, 1번만 count
        'sql': """SELECT 
                    TO_CHAR(A.ts, 'YYYY-MM') AS month,
                    COUNT(DISTINCT B.userid) AS mau
                  FROM raw_data.session_timestamp A
                  JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
                  GROUP BY 1;"""
    },
    dag=dag
)

 

 

사용자별 channel 정보

  • 코드

 

  • 추가 아이디어 : CTAS 부분을 별도의 환경설정 파일로 떼어내보자. config 폴더를 만들고, 테이블별로 py 파일을 제작.
    이렇게 하면 비개발자(데이터분석가) 등이 어려움을 덜 느낄 수 있다.

 

NPS summary 테이블

  • 10점 만점으로 '주변에 추천하겠는가?'라는 질문을 기반으로 고객 만족도를 계산
  • 10, 9점 추천하겠다는 고객(promoter)의 비율에서 0-6점의 불평고객(detractor)의 비율을 뺀 것이 NPS

  • 각자스키마.nps 테이블 or raw_data.nps 테이블 기준으로 일별 NPS summary 생성
  • 일별 NPS 계산 SQL문
1. `SELECT LEFT(created_at, 10) AS date`: `created_at` 열에서 각 날짜의 처음 10자리를 선택하여 `date`로 지정합니다. 이는 날짜를 추출하는 부분입니다.

2. `ROUND(SUM(CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2) AS nps`:
   - `CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END`: 각 점수에 대해 Promoter(9 이상), Detractor(6 이하)를 구분하여 1 또는 -1을 할당합니다.
   - `SUM(...)`: 각 날짜에 대한 총 합계를 계산합니다.
   - `COUNT(1)`: 총 레코드 수를 세어서 각 날짜의 응답자 수를 계산합니다.
   - `::float`: 결과를 실수형으로 형변환합니다.
   - `*100/COUNT(1)`: 비율을 계산하고 100을 곱하여 퍼센트로 변환합니다.
   - `ROUND(..., 2)`: 소수점 이하 두 자리까지 반올림하여 NPS를 계산합니다.

3. `FROM keeyong.nps`: `keeyong` 스키마의 `nps` 테이블에서 데이터를 가져옵니다.

4. `GROUP BY 1`: 날짜별로 그룹화하여 각 날짜에 대한 NPS를 계산합니다.

5. `ORDER BY 1`: 결과를 날짜 순서로 정렬합니다.

따라서 이 쿼리는 각 날짜에 대한 NPS를 계산하고, 이를 날짜별로 그룹화하여 정렬한 결과를 반환합니다.
  • CTAS 부분을 떼어내기(이부분을 learn-airflow/dags/config/nps_summary.py 에 분리해 저장)

# 필요한 모듈과 클래스 임포트
from airflow import DAG
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
import logging
from glob import glob

# DAG에서 사용할 함수: 주어진 디렉토리의 JSON 파일을 읽어들이는 함수
def load_all_jsons_into_list(path_to_json):
    # configs 리스트 초기화
    configs = []
    
    # 주어진 디렉토리에 있는 모든 .py 파일에 대해 반복
    for f_name in glob(path_to_json + '/*.py'):
        # 파일 열기
        with open(f_name) as f:
            # 파일 내용을 텍스트로 읽어들임
            dict_text = f.read()
            
            try:
                # 읽어들인 텍스트를 eval 함수를 사용하여 파이썬 딕셔너리로 변환
                dict = eval(dict_text)
            except Exception as e:
                # 변환 중 에러가 발생하면 로그에 에러 메시지 기록하고 예외 전파
                logging.info(str(e))
                raise
            else:
                # 성공적으로 변환된 딕셔너리를 configs 리스트에 추가
                configs.append(dict)
    
    # 최종적으로 변환된 딕셔너리들을 담고 있는 configs 리스트 반환
    return configs


# 주어진 테이블 이름과 테이블 설정에서 테이블을 찾는 함수
def find(table_name, table_confs):
    # table_confs 리스트를 순회하며 각 테이블 설정 딕셔너리를 확인
    for table in table_confs:
        # 현재 테이블 설정 딕셔너리의 "table" 키 값이 주어진 table_name과 일치하는지 확인
        if table.get("table") == table_name:
            # 일치하는 경우 해당 테이블 설정 딕셔너리를 반환
            return table
    
    # 모든 테이블 설정을 확인한 후에도 일치하는 것이 없으면 None 반환
    return None

# Redshift에서 요약 테이블을 생성하는 함수
def build_summary_table(dag_root_path, dag, tables_load, redshift_conn_id, start_task=None):
    # 로그에 DAG 루트 경로 출력
    logging.info(dag_root_path)
    
    # JSON 형식의 테이블 설정 파일들을 읽어들임
    table_confs = load_all_jsons_into_list(dag_root_path + "/config/")

    # DAG의 시작 작업이 지정되었으면 해당 작업을 기준으로 설정
    if start_task is not None:
        prev_task = start_task
    else:
        prev_task = None

    # tables_load에 지정된 테이블들에 대해 작업 생성 및 DAG에 추가
    for table_name in tables_load:
        # 지정된 테이블명에 해당하는 테이블 설정 찾기
        table = find(table_name, table_confs)
        
        # RedshiftSummaryOperator를 생성하고 설정값 전달
        summarizer = RedshiftSummaryOperator(
            table=table["table"],
            schema=table["schema"],
            redshift_conn_id=redshift_conn_id,
            input_check=table["input_check"],
            main_sql=table["main_sql"],
            output_check=table["output_check"],
            overwrite=table.get("overwrite", True),
            after_sql=table.get("after_sql"),
            pre_sql=table.get("pre_sql"),
            attributes=table.get("attributes", ""),
            dag=dag,
            task_id="analytics"+"__"+table["table"]
        )
        
        # 이전 작업과 현재 작업 간의 의존성 설정
        if prev_task is not None:
            prev_task >> summarizer
        prev_task = summarizer
    
    # 마지막으로 추가된 작업을 반환 (나중에 DAG 작성 시 필요)
    return prev_task

# Redshift에서 SQL을 실행하는 함수
def redshift_sql_function(**context):
    sql=context["params"]["sql"]
    print(sql)
    hook = PostgresHook(postgres_conn_id=context["params"]["redshift_conn_id"])
    hook.run(sql, True)

# Redshift에서 요약 테이블을 생성하는 PythonOperator를 확장한 사용자 정의 연산자 클래스
class RedshiftSummaryOperator(PythonOperator):
    @apply_defaults
    def __init__(self, schema, table, redshift_conn_id, input_check, main_sql, output_check, overwrite, params={}, pre_sql="", after_sql="", attributes="", *args, **kwargs):
        # 사용자가 정의한 RedshiftSummaryOperator 클래스의 초기화 메소드입니다.
        
        # 생성할 요약 테이블의 Redshift 스키마 이름
        self.schema = schema
        
        # 생성할 요약 테이블의 이름
        self.table = table
        
        # Airflow에서 사용하는 Redshift 연결 ID
        self.redshift_conn_id = redshift_conn_id
        
        # 입력 유효성을 검사하기 위한 SQL 쿼리 및 최소 레코드 수로 구성된 목록
        self.input_check = input_check
        
        # 요약 테이블을 생성하기 위한 주요 SQL 쿼리
        self.main_sql = main_sql
        
        # 출력 유효성을 검사하기 위한 SQL 쿼리 및 최소 레코드 수로 구성된 목록
        self.output_check = output_check
        
        # True인 경우 기존 테이블을 덮어쓰고, False인 경우 덮어쓰지 않고 추가합니다.
        self.overwrite = overwrite
        
        # PythonOperator에 전달할 추가 매개변수
        self.params = params
        
        # main_sql 실행 전에 실행할 SQL 쿼리
        self.pre_sql = pre_sql if pre_sql else ""
        
        # main_sql 실행 후에 실행할 SQL 쿼리
        self.after_sql = after_sql.format(schema=self.schema, table=self.table) if after_sql else ""
        
        # 생성할 테이블의 추가 속성
        self.attributes = attributes

        # temp 테이블 생성 및 데이터 적재에 사용될 SQL 문 생성
        if pre_sql:
            self.main_sql = pre_sql
            if not self.main_sql.endswith(";"):
                self.main_sql += ";"
        else:
            self.main_sql = ""
        # 임시테이블이 있으면 삭제, 
        self.main_sql += "DROP TABLE IF EXISTS {schema}.temp_{table};".format(
            schema=self.schema,
            table=self.table
        )
        # CREATE TABLE 문을 만들어서 self.main_sql에 추가합니다.
        self.main_sql += "CREATE TABLE {schema}.temp_{table} {attributes} AS ".format(
            schema=self.schema,
            table=self.table,
            attributes=self.attributes
        ) + self.main_sql

        # 상위 클래스인 PythonOperator를 호출하여 초기화
        # RedshiftSummaryOperator 클래스의 초기화 메소드에서는 두 번의 상위 클래스 초기화(super)가 이루어집니다.

        # 첫 번째 super 호출:
        super(RedshiftSummaryOperator, self).__init__(
            python_callable=redshift_sql_function,  # Python callable로 사용될 함수
            params={
                "sql": self.main_sql,  # 생성한 SQL 문
                "overwrite": self.overwrite,  # 덮어쓰기 여부
                "redshift_conn_id": self.redshift_conn_id  # Redshift 연결 ID
            },
            provide_context=True,  # Airflow 컨텍스트 제공 여부
            *args,
            **kwargs
        )

        # 두 번째 super 호출:
        # after_sql이 정의되어 있다면 해당 값을 사용하고, 그렇지 않으면 빈 문자열("")을 사용합니다.
        if after_sql:
            self.after_sql = after_sql.format(
                schema=self.schema,
                table=self.table
            )
        else:
            self.after_sql = ""

        super(RedshiftSummaryOperator, self).__init__(
            python_callable=redshift_sql_function,  # Python callable로 사용될 함수
            params={
                "sql": main_sql,  # 생성한 SQL 문
                "overwrite": overwrite,  # 덮어쓰기 여부
                "redshift_conn_id": self.redshift_conn_id  # Redshift 연결 ID
            },
            provide_context=True,  # Airflow 컨텍스트 제공 여부
            *args,
            **kwargs
        )


    # temp 테이블과 본 테이블을 스왑하는 함수
    def swap(self):
        # 원본 테이블 삭제
        # 임시테이블 원본테이블 이름으로 바꿈
        # {schema}와 {table}을 사용자가 정의한 값으로 대체하여 
        # SELECT 권한을 부여하는 SQL 문
        sql = """BEGIN;
        DROP TABLE IF EXISTS {schema}.{table} CASCADE;
        ALTER TABLE {schema}.temp_{table} RENAME TO {table};   
        GRANT SELECT ON TABLE {schema}.{table} TO GROUP analytics_users;
        END
        """.format(schema=self.schema,table=self.table)
        self.hook.run(sql, True)

    def execute(self, context):
        """
        RedshiftSummaryOperator의 execute 메소드입니다.

        1. Input_check 먼저 수행
           - input_check는 "sql"과 "count"를 포함하는 딕셔너리의 목록이어야 함
        """
        self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        for item in self.input_check:
            (cnt,) = self.hook.get_first(item["sql"])
            if cnt < item["count"]:
                raise AirflowException(
                    "Input Validation Failed for " + str(item["sql"]))

        """
        2. temp 테이블 생성 및 데이터 적재 수행
        """
        return_value = super(RedshiftSummaryOperator, self).execute(context)

        """
        3. Output_check은 self.output_check 사용
        """
        for item in self.output_check:
            (cnt,) = self.hook.get_first(item["sql"].format(schema=self.schema, table=self.table))
            if item.get("op") == 'eq':
                if int(cnt) != int(item["count"]):
                    raise AirflowException(
                        "Output Validation of 'eq' Failed for " + str(item["sql"]) + ": " + str(cnt) + " vs. " + str(item["count"])
                    )
            else:
                if cnt < item["count"]:
                    raise AirflowException(
                        "Output Validation Failed for " + str(item["sql"]) + ": " + str(cnt) + " vs. " + str(item["count"])
                    )
        
        """
        4. 이제 temp 테이블 이름을 스왑
        """
        self.swap()

        """
        5. after_sql이 정의되어 있다면 실행
        """
        if self.after_sql:
            self.hook.run(self.after_sql, True)

        return return_value

+ Recent posts