RDD, DataFrame, Dataset (Immutable Distributed Data)
- 2016년에 DataFrame과 Dataset은 하나의 API로 통합됨
- 모두 파티션으로 나뉘어 Spark에서 처리됨
RDD (Resilient Distributed Dataset):
- - **로우레벨 데이터:** RDD는 클러스터 내의 서버에 분산된 로우레벨 데이터를 지칭합니다.
- **스키마 없음:** 레코드별로 존재하지만 스키마가 존재하지 않습니다. 따라서 구조화된 데이터나 비구조화된 데이터를 모두 지원합니다.
DataFrame과 Dataset:
- DataFrame:
- RDD 위에 만들어지는데, 필드 정보를 갖고 있습니다. 이는 테이블 형태로 생각할 수 있습니다.
- 구조화된 데이터를 다룰 때 유용하며, 스키마 정보를 가지고 있습니다.
- PySpark에서도 사용 가능하며, 특히 Python에서는 주로 DataFrame을 활용합니다. - Dataset:
DataFrame과 마찬가지로 RDD 위에 만들어집니다. 하지만 Dataset은 타입 정보가 존재하며, 컴파일 언어에서 사용
- 주로 Scala와 Java에서 활용되며, 컴파일러가 타입을 검사할 수 있는 장점이 있습니다.
- PySpark에서는 주로 DataFrame을 사용하고, 타입 정보가 필요한 경우에는 Dataset을 활용할 수 있습니다.
spark sql 엔진
- Code Analysis (코드 분석):
- **역할:** Spark는 사용자 코드를 분석하여 데이터프레임 또는 RDD의 트랜스포메이션 및 액션을 파악합니다.
- **기능:** 코드 분석 단계에서는 사용자가 작성한 코드를 해석하고, 실행 계획을 수립하기 위한 정보를 추출합니다. - Logical Optimization (Catalyst Optimizer, 논리적 최적화):
- **역할:** Catalyst Optimizer는 논리적 최적화를 수행하여 불필요한 계산을 제거하고 최적의 실행 계획을 도출
- **기능:** 논리적 최적화는 사용자의 쿼리를 내부적으로 최적화하여 처리 효율을 향상시킵니다. 예를 들어, 필요없는 컬럼이나 조건을 제거하고, 필터링을 최적화하는 등의 작업이 이루어집니다. - Physical Planning (물리적 계획):
- **역할:** Catalyst Optimizer에 의해 최적화된 논리적 실행 계획을 물리적 실행 계획으로 변환합니다.
- **기능:** 최적화된 논리적 계획을 어떻게 물리적으로 실행할지에 대한 계획을 수립합니다. 이는 실제 익스큐터에서 수행될 태스크 및 데이터 셔플링 방법 등을 결정합니다. - Code Generation (Project Tungsten):
- **역할:** Project Tungsten은 물리적 실행 계획을 효율적으로 구현하기 위해 코드를 생성합니다.
- **기능:** 물리적 실행 계획에 대한 코드를 생성하고, 코드 실행 시점에서 최적화된 코드를 사용하여 성능을 최대한 끌어올립니다. 코드 생성을 통해 JVM을 효율적으로 활용하고, 불필요한 오버헤드를 최소화합니다.
Spark 데이터 구조 - RDD
- 변경이 불가능한 분산 저장된 데이터:
- RDD는 여러 개의 파티션으로 나누어진 변경이 불가능한 데이터를 나타냅니다.
- 각각의 파티션은 클러스터 내에 분산되어 저장되며, 데이터의 내결함성을 제공합니다.
- RDD는 로우레벨의 함수형 변환을 지원하며, `map`, `filter`, `flatMap` 등의 함수형 프로그래밍 연산을 활용하여 데이터를 조작할 수 있습니다. - 일반 파이썬 데이터의 RDD 변환:
- 파이썬에서는 `parallelize` 함수를 사용하여 일반적인 데이터를 RDD로 변환할 수 있습니다.
- RDD를 사용하면 분산 환경에서 데이터를 효율적으로 처리할 수 있으며, RDD의 불변성과 내결함성을 활용하여 안정적인 분산 처리를 보장합니다.
- 반대로, RDD를 일반적인 파이썬 데이터로 변환하려면 `collect` 함수를 사용할 수 있습니다. 이는 RDD의 모든 데이터를 드라이버 노드로 수집하는 역할을 합니다.
Spark 데이터 구조 - 데이터 프레임:
- 변경이 불가한 분산 저장된 데이터:**
- 데이터 프레임은 RDD와 마찬가지로 변경이 불가능한 분산 저장된 데이터입니다.
- 여러 개의 파티션으로 나누어져 클러스터에 분산 저장되며, 내결함성과 데이터 처리의 안정성을 제공합니다. - 관계형 데이터베이스 테이블과 유사한 구조:**
- RDD와 달리 데이터 프레임은 관계형 데이터베이스 테이블과 유사한 형태로 구성됩니다.
- 행과 열로 이루어진 테이블 형식으로 데이터를 저장하며, 각 컬럼은 데이터의 속성을 나타냅니다.
- 판다스의 데이터 프레임이나 관계형 데이터베이스의 테이블과 유사한 편리한 조작 기능을 제공합니다. - 다양한 데이터 소스 지원:**
- 데이터 프레임은 다양한 데이터 소스로부터 데이터를 읽거나 저장할 수 있습니다.
- HDFS, Hive, 외부 데이터베이스, 기존 RDD 등 다양한 데이터 소스와 통합하여 사용할 수 있습니다. - 다양한 언어 지원:**
- 데이터 프레임은 스칼라, 자바, 파이썬과 같은 다양한 프로그래밍 언어에서 지원됩니다.
- 이로써 사용자는 선호하는 언어를 선택하여 데이터 프레임을 조작하고 분석할 수 있습니다.
Spark Session 생성:
- Spark 프로그램 시작점:
- Spark 프로그램의 시작은 SparkSession을 생성하는 것으로 이루어집니다.
- 각 프로그램은 하나의 SparkSession을 생성하며, 이를 통해 Spark Cluster와의 통신이 이루어집니다.
- SparkSession은 싱글톤 객체로 구현되어 있어, 하나의 프로그램에서 전역적으로 사용됩니다. - Spark 2.0에서 도입:
- Spark 2.0 버전에서 처음으로 도입된 개념으로, 이전에는 SparkContext를 통해 Spark 기능을 사용했습니다.
- SparkSession은 Spark 2.0에서 통합된 인터페이스로 다양한 기능들을 효과적으로 사용할 수 있도록 지원합니다. - 다양한 기능 통합:
- Spark Session을 통해 DataFrame, SQL, Streaming, ML API 등 다양한 Spark 기능에 접근할 수 있습니다.
- config 메소드를 활용하여 다양한 환경 설정이 가능하며, 각종 작업을 효율적으로 수행할 수 있습니다. - RDD와의 관계:
- RDD와 관련된 작업을 수행할 때는 SparkSession 밑의 sparkContext 객체를 사용합니다.
- SparkSession은 DataFrame과 관련된 작업에 최적화되어 있으며, RDD 작업은 sparkContext를 통해 수행됩니다.
Spark 세션 생성 - PySpark 예제
- 위의 코드는 PySpark에서 Spark Session을 생성하는 코드입니다. 각각의 파라미터에 대한 설명은 다음과 같습니다:
- **master("local[*]"):**
- Spark 클러스터의 주소 또는 클러스터 관리자에 대한 URL을 지정합니다.
- "local[*]"는 로컬 환경에서 모든 가능한 코어를 사용하여 실행하도록 지정합니다. 이것은 주로 개발 및 테스트 목적으로 사용됩니다.
- **appName('PySpark Tutorial'):**
- 생성되는 Spark 애플리케이션의 이름을 지정합니다. 이는 Spark 클러스터의 웹 UI에서 식별에 사용됩니다.
- **getOrCreate():**
- SparkSession 객체를 생성하거나 이미 존재하는 경우 기존의 SparkSession을 반환합니다.
- SparkSession은 보통 하나의 애플리케이션에 대해 하나만 생성되어야 하므로, 이미 존재하는지 여부를 체크하고 새로 생성하거나 반환합니다.
이 코드를 통해 SparkSession이 생성되면, 해당 객체를 사용하여 PySpark에서 다양한 기능들을 수행할 수 있습니다.
PySpark에 주요 모듈 및 클래스
- pyspark.sql.SparkSession:**
- Spark 애플리케이션을 시작하기 위한 핵심 진입점인 Spark 세션을 생성하는 클래스입니다. - pyspark.sql.DataFrame:**
- 테이블이나 데이터베이스 테이블과 유사한 분산 데이터프레임을 나타내는 클래스입니다. 구조화된 데이터를 처리하는 데 사용됩니다. - pyspark.sql.Column:**
- DataFrame의 컬럼을 나타내는 클래스로, 특정 컬럼에 대한 작업 및 변환을 수행하는 데 사용됩니다. - pyspark.sql.Row:**
- DataFrame의 레코드를 나타내는 클래스로, 각 레코드는 여러 컬럼의 값으로 구성됩니다. - pyspark.sql.functions:**
- 다양한 내장 함수를 제공하는 모듈로, DataFrame 컬럼에 대한 다양한 연산과 변환을 수행하는 데 사용됩니다. - pyspark.sql.types:**
- 다양한 데이터 타입을 정의하는 모듈로, DataFrame의 스키마를 정의하거나 데이터 타입 변환에 사용됩니다. - pyspark.sql.Window:**
- 윈도우 함수를 정의하고 사용하는 데 사용되는 클래스로, 데이터의 윈도우 기반 분석을 수행하는 데 도움을 줌
이러한 모듈과 클래스는 PySpark를 사용하여 구조화된 데이터를 처리하고 분석하는 데 필수적이며, 다양한 작업 및 변환을 수행할 수 있도록 도와줍니다.
Spark Session을 생성할 때 주요 환경 변수 몇 가지
- executor별 메모리:** `spark.executor.memory` (기본값: 1g)
- 각 익스큐터가 사용할 메모리 양을 설정합니다. - executor별 CPU 수:** `spark.executor.cores` (YARN에서는 기본값 1)
- 각 익스큐터가 사용할 CPU 코어 수를 설정합니다. - driver 메모리:** `spark.driver.memory` (기본값: 1g)
- 드라이버가 사용할 메모리 양을 설정합니다. - Shuffle 후 Partition의 수:** `spark.sql.shuffle.partitions` (기본값: 최대 200)
- 셔플 후 생성되는 파티션의 수를 설정합니다. - 그 외 환경변수들 https://spark.apache.org/docs/latest/configuration.html#application-properties
Spark Session의 환경 설정은 다음 4가지 방법으로 수행할 수 있습니다:
- 환경변수:**
- 환경 변수를 사용하여 Spark Session의 설정을 지정할 수 있습니다. - $SPARK_HOME/conf/spark_defaults.conf:**
- Spark이 사용하는 기본 설정 파일인 `spark_defaults.conf`를 편집하여 설정을 지정할 수 있습니다. - spark-submit 명령의 커맨드라인 파라미터:**
- `spark-submit` 명령을 사용할 때, 명령행에서 직접 설정을 지정할 수 있습니다. - SparkSession 만들 때 지정:**
- Spark Session을 생성할 때 직접 설정을 지정할 수 있습니다. 이는 `SparkConf` 객체를 통해 수행됩니다.
이 중에서 `spark-submit` 명령의 커맨드라인 파라미터와 SparkSession을 만들 때 지정하는 방법은 각각 프로그램 실행 시 동적으로 설정을 변경할 수 있는 방법입니다.
spark 환경설정 방법( sparksession 생성시 일일히 지정)
- from pyspark.sql import SparkSession
# SparkSession은 싱글턴이므로 이미 존재하는 경우 가져오고, 없으면 새로 생성합니다.
spark = SparkSession.builder \
.master("local[*]") \ # 클러스터 매니저를 설정합니다. "local[*]"는 로컬 모드에서 사용 가능한 모든 코어를 활용함을 의미합니다.
.appName('PySpark Tutorial') \ # Spark 애플리케이션의 이름을 지정합니다.
.config("spark.some.config.option1", "some-value") \ # 사용자가 지정한 Spark 환경 설정을 추가합니다.
.config("spark.some.config.option2", "some-value") \
.getOrCreate() # 이미 존재하는 SparkSession이 있다면 가져오고, 없다면 새로 생성합니다.
spark 환경설정 방법( SparkConf 객체에 환경 설정하고 SparkSession에 지정)
- from pyspark.sql import SparkSession
from pyspark import SparkConf
# SparkConf 객체를 생성하고 필요한 설정을 추가합니다.
conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial") # Spark 애플리케이션의 이름을 설정합니다.
conf.set("spark.master", "local[*]") # 클러스터 매니저를 설정합니다. "local[*]"는 로컬 모드에서 사용 가능한 모든 코어를 활용함을 의미합니다.
# SparkSession을 생성하고 SparkConf 객체를 전달합니다.
# 이미 존재하는 SparkSession이 있다면 가져오고, 없다면 새로 생성합니다.
spark = SparkSession.builder \
.config(conf=conf) \
.getOrCreate()
전체적인 PySpark 프로세스의 흐름
- Spark 세션 생성:**
- `SparkSession`을 생성하여 Spark 클러스터와 통신합니다. - 입력 데이터 로딩:**
- `SparkSession`을 사용하여 원하는 입력 데이터를 로드합니다. - 데이터 조작 작업:**
- DataFrame API나 Spark SQL을 활용하여 데이터를 조작합니다.
- 판다스와 유사한 방식으로 작업하며, 필요한 경우 새로운 DataFrame을 생성합니다.
- 원하는 결과가 나올 때까지 다양한 작업을 수행합니다. - 최종 결과 저장:**
- 조작한 데이터나 계산된 결과를 최종적으로 저장합니다.
- 다양한 출력 포맷에 따라 적절한 형식으로 결과를 저장할 수 있습니다.
이러한 플로우를 통해 PySpark을 활용하여 데이터 처리 및 분석 작업을 수행할 수 있습니다.
Spark Session에서 지원하는 주요 데이터 소스
- `spark.read`와 `spark.write`를 사용하여 데이터를 로드하고 저장합니다.
- 많이 사용되는 데이터 소스들
HDFS 파일:**
- CSV, JSON, Parquet, ORC, Text, Avro 등과 같은 다양한 파일 형식을 지원합니다.
- 특히, Parquet, ORC, Avro는 고성능 및 압축 효율성을 제공하는데, 이에 대한 자세한 내용은 나중에 설명될 것
- Hive 테이블 - JDBC 관계형 데이터베이스:**
- JDBC를 통해 연결된 다양한 관계형 데이터베이스에서 데이터를 읽거나 쓸 수 있습니다. - 클라우드 기반 데이터 시스템:**
- 다양한 클라우드 플랫폼에서 제공하는 데이터 저장소나 데이터 서비스를 지원합니다. - 스트리밍 시스템:**
- Spark Streaming을 사용하여 실시간 데이터를 처리하거나, 다양한 스트리밍 소스에서 데이터를 읽을 수 있습니다.
이를 통해 Spark은 다양한 데이터 소스와 형식을 효율적으로 처리하고 분석할 수 있습니다.
Spark 개발 환경 옵션
- Local Standalone Spark + Spark Shell
- Python IDE – PyCharm, Visual Studio
- Databricks Cloud – 커뮤니티 에디션을 무료로 사용
- 다른 노트북 – 주피터 노트북, 구글 Colab, 아나콘다 등등
Local Standalone Spark:
- 로컬 스탠드얼론(Spark Standalone)은 Spark을 단일 머신에서 실행하고 테스트하기 위한 클러스터 매니저 옵션입니다. 주로 개발 및 간단한 테스트 목적으로 사용됩니다.
- Spark Cluster Manager 설정:**
- `master`를 `local[n]`으로 설정하여 사용합니다.
- `n`은 사용할 쓰레드 수를 나타내며, 로컬 클러스터 내에서 실행될 스레드 수를 조절합니다.
- 주로 개발이나 간단한 테스트 용도
- 하나의 JVM(Java Virtual Machine)에서 모든 프로세스가 실행됩니다.
- 단일 Driver와 단일 Executor가 실행됩니다.
- 1+ 쓰레드가 Executor안에서 실행됨
- Executor 내에서 생성되는 쓰레드 수는 다음과 같이 조절됩니다:
- `local`: 하나의 쓰레드만 생성됩니다.
- `local[*]`: 컴퓨터의 CPU 수만큼 쓰레드가 생성됩니다.
로컬 스탠드얼론 모드를 사용하면 효율적으로 개발 및 테스트를 수행할 수 있으며, 단일 머신에서 Spark 애플리케이션을 실행하는 간편한 방법을 제공합니다.
구글 Colab에서 Spark 사용
- PySpark + Py4J를 설치
1) 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
2) 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
3) Spark Web UI는 기본적으로는 접근 불가 (ngrok을 통해 억지로 열 수는 있음)
4) Py4J(파이썬에서 JVM내에 있는 자바 객체를 사용가능하게 해줌)
'하둡,spark' 카테고리의 다른 글
spark 데이터프레임 실습1 (0) | 2024.01.23 |
---|---|
spark 기초 실습(colab) (0) | 2024.01.19 |
spark 데이터 처리 (0) | 2024.01.18 |
Spark 프로그램 실행 환경 (0) | 2024.01.18 |
spark 기초 (0) | 2024.01.18 |