RDD, DataFrame, Dataset (Immutable Distributed Data)

  • 2016년에 DataFrame과 Dataset은 하나의 API로 통합됨
  • 모두 파티션으로 나뉘어 Spark에서 처리됨



RDD (Resilient Distributed Dataset):

  • - **로우레벨 데이터:** RDD는 클러스터 내의 서버에 분산된 로우레벨 데이터를 지칭합니다.
      - **스키마 없음:** 레코드별로 존재하지만 스키마가 존재하지 않습니다. 따라서 구조화된 데이터나 비구조화된 데이터를 모두 지원합니다.

DataFrame과 Dataset:

  • DataFrame:
        - RDD 위에 만들어지는데, 필드 정보를 갖고 있습니다. 이는 테이블 형태로 생각할 수 있습니다.
        - 구조화된 데이터를 다룰 때 유용하며, 스키마 정보를 가지고 있습니다.
        - PySpark에서도 사용 가능하며, 특히 Python에서는 주로 DataFrame을 활용합니다.

  •  Dataset:
     DataFrame과 마찬가지로 RDD 위에 만들어집니다. 하지만 Dataset은 타입 정보가 존재하며, 컴파일 언어에서 사용 
        - 주로 Scala와 Java에서 활용되며, 컴파일러가 타입을 검사할 수 있는 장점이 있습니다.
        - PySpark에서는 주로 DataFrame을 사용하고, 타입 정보가 필요한 경우에는 Dataset을 활용할 수 있습니다.

 

spark sql 엔진

  1. Code Analysis (코드 분석):
       - **역할:** Spark는 사용자 코드를 분석하여 데이터프레임 또는 RDD의 트랜스포메이션 및 액션을 파악합니다.
       - **기능:** 코드 분석 단계에서는 사용자가 작성한 코드를 해석하고, 실행 계획을 수립하기 위한 정보를 추출합니다.

  2. Logical Optimization (Catalyst Optimizer, 논리적 최적화):
       - **역할:** Catalyst Optimizer는 논리적 최적화를 수행하여 불필요한 계산을 제거하고 최적의 실행 계획을 도출
       - **기능:** 논리적 최적화는 사용자의 쿼리를 내부적으로 최적화하여 처리 효율을 향상시킵니다. 예를 들어, 필요없는 컬럼이나 조건을 제거하고, 필터링을 최적화하는 등의 작업이 이루어집니다.

  3. Physical Planning (물리적 계획):
       - **역할:** Catalyst Optimizer에 의해 최적화된 논리적 실행 계획을 물리적 실행 계획으로 변환합니다.
       - **기능:** 최적화된 논리적 계획을 어떻게 물리적으로 실행할지에 대한 계획을 수립합니다. 이는 실제 익스큐터에서 수행될 태스크 및 데이터 셔플링 방법 등을 결정합니다.

  4. Code Generation (Project Tungsten):
       - **역할:** Project Tungsten은 물리적 실행 계획을 효율적으로 구현하기 위해 코드를 생성합니다.
       - **기능:** 물리적 실행 계획에 대한 코드를 생성하고, 코드 실행 시점에서 최적화된 코드를 사용하여 성능을 최대한 끌어올립니다. 코드 생성을 통해 JVM을 효율적으로 활용하고, 불필요한 오버헤드를 최소화합니다.

 

Spark 데이터 구조 - RDD

  • 변경이 불가능한 분산 저장된 데이터:
      - RDD는 여러 개의 파티션으로 나누어진 변경이 불가능한 데이터를 나타냅니다.
      - 각각의 파티션은 클러스터 내에 분산되어 저장되며, 데이터의 내결함성을 제공합니다.
      - RDD는 로우레벨의 함수형 변환을 지원하며, `map`, `filter`, `flatMap` 등의 함수형 프로그래밍 연산을 활용하여 데이터를 조작할 수 있습니다.

  • 일반 파이썬 데이터의 RDD 변환:
      - 파이썬에서는 `parallelize` 함수를 사용하여 일반적인 데이터를 RDD로 변환할 수 있습니다.
      - RDD를 사용하면 분산 환경에서 데이터를 효율적으로 처리할 수 있으며, RDD의 불변성과 내결함성을 활용하여 안정적인 분산 처리를 보장합니다.
      - 반대로, RDD를 일반적인 파이썬 데이터로 변환하려면 `collect` 함수를 사용할 수 있습니다. 이는 RDD의 모든 데이터를 드라이버 노드로 수집하는 역할을 합니다.

 

Spark 데이터 구조 - 데이터 프레임:

  • 변경이 불가한 분산 저장된 데이터:**
      - 데이터 프레임은 RDD와 마찬가지로 변경이 불가능한 분산 저장된 데이터입니다.
      - 여러 개의 파티션으로 나누어져 클러스터에 분산 저장되며, 내결함성과 데이터 처리의 안정성을 제공합니다.

  • 관계형 데이터베이스 테이블과 유사한 구조:**
      - RDD와 달리 데이터 프레임은 관계형 데이터베이스 테이블과 유사한 형태로 구성됩니다.
      - 행과 열로 이루어진 테이블 형식으로 데이터를 저장하며, 각 컬럼은 데이터의 속성을 나타냅니다.
      - 판다스의 데이터 프레임이나 관계형 데이터베이스의 테이블과 유사한 편리한 조작 기능을 제공합니다.

  • 다양한 데이터 소스 지원:**
      - 데이터 프레임은 다양한 데이터 소스로부터 데이터를 읽거나 저장할 수 있습니다.
      - HDFS, Hive, 외부 데이터베이스, 기존 RDD 등 다양한 데이터 소스와 통합하여 사용할 수 있습니다.

  • 다양한 언어 지원:**
      - 데이터 프레임은 스칼라, 자바, 파이썬과 같은 다양한 프로그래밍 언어에서 지원됩니다.
      - 이로써 사용자는 선호하는 언어를 선택하여 데이터 프레임을 조작하고 분석할 수 있습니다.

Spark Session 생성:

  • Spark 프로그램 시작점:
      - Spark 프로그램의 시작은 SparkSession을 생성하는 것으로 이루어집니다.
      - 각 프로그램은 하나의 SparkSession을 생성하며, 이를 통해 Spark Cluster와의 통신이 이루어집니다.
      - SparkSession은 싱글톤 객체로 구현되어 있어, 하나의 프로그램에서 전역적으로 사용됩니다.

  • Spark 2.0에서 도입:
      - Spark 2.0 버전에서 처음으로 도입된 개념으로, 이전에는 SparkContext를 통해 Spark 기능을 사용했습니다.
      - SparkSession은 Spark 2.0에서 통합된 인터페이스로 다양한 기능들을 효과적으로 사용할 수 있도록 지원합니다.

  • 다양한 기능 통합:
      - Spark Session을 통해 DataFrame, SQL, Streaming, ML API 등 다양한 Spark 기능에 접근할 수 있습니다.
      - config 메소드를 활용하여 다양한 환경 설정이 가능하며, 각종 작업을 효율적으로 수행할 수 있습니다.

  • RDD와의 관계:
      - RDD와 관련된 작업을 수행할 때는 SparkSession 밑의 sparkContext 객체를 사용합니다.
      - SparkSession은 DataFrame과 관련된 작업에 최적화되어 있으며, RDD 작업은 sparkContext를 통해 수행됩니다.

Spark 세션 생성 - PySpark 예제

  • 위의 코드는 PySpark에서 Spark Session을 생성하는 코드입니다. 각각의 파라미터에 대한 설명은 다음과 같습니다:

    - **master("local[*]"):**
      - Spark 클러스터의 주소 또는 클러스터 관리자에 대한 URL을 지정합니다.
      - "local[*]"는 로컬 환경에서 모든 가능한 코어를 사용하여 실행하도록 지정합니다. 이것은 주로 개발 및 테스트 목적으로 사용됩니다.

    - **appName('PySpark Tutorial'):**
      - 생성되는 Spark 애플리케이션의 이름을 지정합니다. 이는 Spark 클러스터의 웹 UI에서 식별에 사용됩니다.

    - **getOrCreate():**
      - SparkSession 객체를 생성하거나 이미 존재하는 경우 기존의 SparkSession을 반환합니다.
      - SparkSession은 보통 하나의 애플리케이션에 대해 하나만 생성되어야 하므로, 이미 존재하는지 여부를 체크하고 새로 생성하거나 반환합니다.

    이 코드를 통해 SparkSession이 생성되면, 해당 객체를 사용하여 PySpark에서 다양한 기능들을 수행할 수 있습니다.

 

PySpark에 주요 모듈 및 클래스

  1. pyspark.sql.SparkSession:**
       - Spark 애플리케이션을 시작하기 위한 핵심 진입점인 Spark 세션을 생성하는 클래스입니다.

  2. pyspark.sql.DataFrame:**
       - 테이블이나 데이터베이스 테이블과 유사한 분산 데이터프레임을 나타내는 클래스입니다. 구조화된 데이터를 처리하는 데 사용됩니다.

  3. pyspark.sql.Column:**
       - DataFrame의 컬럼을 나타내는 클래스로, 특정 컬럼에 대한 작업 및 변환을 수행하는 데 사용됩니다.

  4. pyspark.sql.Row:**
       - DataFrame의 레코드를 나타내는 클래스로, 각 레코드는 여러 컬럼의 값으로 구성됩니다.

  5. pyspark.sql.functions:**
       - 다양한 내장 함수를 제공하는 모듈로, DataFrame 컬럼에 대한 다양한 연산과 변환을 수행하는 데 사용됩니다.

  6. pyspark.sql.types:**
       - 다양한 데이터 타입을 정의하는 모듈로, DataFrame의 스키마를 정의하거나 데이터 타입 변환에 사용됩니다.

  7. pyspark.sql.Window:**
       - 윈도우 함수를 정의하고 사용하는 데 사용되는 클래스로, 데이터의 윈도우 기반 분석을 수행하는 데 도움을 줌

    이러한 모듈과 클래스는 PySpark를 사용하여 구조화된 데이터를 처리하고 분석하는 데 필수적이며, 다양한 작업 및 변환을 수행할 수 있도록 도와줍니다.

 

Spark Session을 생성할 때  주요 환경 변수 몇 가지

  1. executor별 메모리:** `spark.executor.memory` (기본값: 1g)
       - 각 익스큐터가 사용할 메모리 양을 설정합니다.

  2. executor별 CPU 수:** `spark.executor.cores` (YARN에서는 기본값 1)
       - 각 익스큐터가 사용할 CPU 코어 수를 설정합니다.

  3. driver 메모리:** `spark.driver.memory` (기본값: 1g)
       - 드라이버가 사용할 메모리 양을 설정합니다.

  4. Shuffle 후 Partition의 수:** `spark.sql.shuffle.partitions` (기본값: 최대 200)
       - 셔플 후 생성되는 파티션의 수를 설정합니다.

  5. 그 외 환경변수들 https://spark.apache.org/docs/latest/configuration.html#application-properties

Spark Session의 환경 설정은 다음 4가지 방법으로 수행할 수 있습니다:

  1. 환경변수:**
       - 환경 변수를 사용하여 Spark Session의 설정을 지정할 수 있습니다.

  2. $SPARK_HOME/conf/spark_defaults.conf:**
       - Spark이 사용하는 기본 설정 파일인 `spark_defaults.conf`를 편집하여 설정을 지정할 수 있습니다.

  3. spark-submit 명령의 커맨드라인 파라미터:**
       - `spark-submit` 명령을 사용할 때, 명령행에서 직접 설정을 지정할 수 있습니다.

  4. SparkSession 만들 때 지정:**
       - Spark Session을 생성할 때 직접 설정을 지정할 수 있습니다. 이는 `SparkConf` 객체를 통해 수행됩니다.

    이 중에서 `spark-submit` 명령의 커맨드라인 파라미터와 SparkSession을 만들 때 지정하는 방법은 각각 프로그램 실행 시 동적으로 설정을 변경할 수 있는 방법입니다.

spark 환경설정 방법( sparksession 생성시 일일히 지정)

  • from pyspark.sql import SparkSession
    # SparkSession은 싱글턴이므로 이미 존재하는 경우 가져오고, 없으면 새로 생성합니다.
    spark = SparkSession.builder \
        .master("local[*]") \  # 클러스터 매니저를 설정합니다. "local[*]"는 로컬 모드에서 사용 가능한 모든 코어를 활용함을 의미합니다.
        .appName('PySpark Tutorial') \  # Spark 애플리케이션의 이름을 지정합니다.
        .config("spark.some.config.option1", "some-value") \  # 사용자가 지정한 Spark 환경 설정을 추가합니다.
        .config("spark.some.config.option2", "some-value") \
        .getOrCreate()  # 이미 존재하는 SparkSession이 있다면 가져오고, 없다면 새로 생성합니다.

spark 환경설정 방법( SparkConf 객체에 환경 설정하고 SparkSession에 지정)

  • from pyspark.sql import SparkSession
    from pyspark import SparkConf

    # SparkConf 객체를 생성하고 필요한 설정을 추가합니다.
    conf = SparkConf()
    conf.set("spark.app.name", "PySpark Tutorial")  # Spark 애플리케이션의 이름을 설정합니다.
    conf.set("spark.master", "local[*]")  # 클러스터 매니저를 설정합니다. "local[*]"는 로컬 모드에서 사용 가능한 모든 코어를 활용함을 의미합니다.

    # SparkSession을 생성하고 SparkConf 객체를 전달합니다.
    # 이미 존재하는 SparkSession이 있다면 가져오고, 없다면 새로 생성합니다.
    spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

전체적인 PySpark 프로세스의 흐름

  1. Spark 세션 생성:**
       - `SparkSession`을 생성하여 Spark 클러스터와 통신합니다.

  2. 입력 데이터 로딩:**
       - `SparkSession`을 사용하여 원하는 입력 데이터를 로드합니다.

  3. 데이터 조작 작업:**
       - DataFrame API나 Spark SQL을 활용하여 데이터를 조작합니다.
       - 판다스와 유사한 방식으로 작업하며, 필요한 경우 새로운 DataFrame을 생성합니다.
       - 원하는 결과가 나올 때까지 다양한 작업을 수행합니다.

  4. 최종 결과 저장:**
       - 조작한 데이터나 계산된 결과를 최종적으로 저장합니다.
       - 다양한 출력 포맷에 따라 적절한 형식으로 결과를 저장할 수 있습니다.

    이러한 플로우를 통해 PySpark을 활용하여 데이터 처리 및 분석 작업을 수행할 수 있습니다.

 

Spark Session에서 지원하는 주요 데이터 소스

  • `spark.read`와 `spark.write`를 사용하여 데이터를 로드하고 저장합니다. 

  • 많이 사용되는 데이터 소스들
    HDFS 파일:**
       - CSV, JSON, Parquet, ORC, Text, Avro 등과 같은 다양한 파일 형식을 지원합니다.
       - 특히, Parquet, ORC, Avro는 고성능 및 압축 효율성을 제공하는데, 이에 대한 자세한 내용은 나중에 설명될 것
       - Hive 테이블

  • JDBC 관계형 데이터베이스:**
       - JDBC를 통해 연결된 다양한 관계형 데이터베이스에서 데이터를 읽거나 쓸 수 있습니다.

  • 클라우드 기반 데이터 시스템:**
       - 다양한 클라우드 플랫폼에서 제공하는 데이터 저장소나 데이터 서비스를 지원합니다.

  • 스트리밍 시스템:**
       - Spark Streaming을 사용하여 실시간 데이터를 처리하거나, 다양한 스트리밍 소스에서 데이터를 읽을 수 있습니다.
    이를 통해 Spark은 다양한 데이터 소스와 형식을 효율적으로 처리하고 분석할 수 있습니다.

 

Spark 개발 환경 옵션

  • Local Standalone Spark + Spark Shell
  • Python IDE – PyCharm, Visual Studio 
  • Databricks Cloud – 커뮤니티 에디션을 무료로 사용
  • 다른 노트북 – 주피터 노트북, 구글 Colab, 아나콘다 등등

Local Standalone Spark:

  • 로컬 스탠드얼론(Spark Standalone)은 Spark을 단일 머신에서 실행하고 테스트하기 위한 클러스터 매니저 옵션입니다. 주로 개발 및 간단한 테스트 목적으로 사용됩니다.

    - Spark Cluster Manager 설정:**
      - `master`를 `local[n]`으로 설정하여 사용합니다.
      - `n`은 사용할 쓰레드 수를 나타내며, 로컬 클러스터 내에서 실행될 스레드 수를 조절합니다.

    - 주로 개발이나 간단한 테스트 용도
    - 하나의 JVM(Java Virtual Machine)에서 모든 프로세스가 실행됩니다.
      - 단일 Driver와 단일 Executor가 실행됩니다.
      - 1+ 쓰레드가 Executor안에서 실행됨

    - Executor 내에서 생성되는 쓰레드 수는 다음과 같이 조절됩니다:
      - `local`: 하나의 쓰레드만 생성됩니다.
      - `local[*]`: 컴퓨터의 CPU 수만큼 쓰레드가 생성됩니다.

    로컬 스탠드얼론 모드를 사용하면 효율적으로 개발 및 테스트를 수행할 수 있으며, 단일 머신에서 Spark 애플리케이션을 실행하는 간편한 방법을 제공합니다.

 

구글 Colab에서 Spark 사용

  • PySpark + Py4J를 설치
    1) 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
    2) 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
    3) Spark Web UI는 기본적으로는 접근 불가 (ngrok을 통해 억지로 열 수는 있음)
    4) Py4J(파이썬에서 JVM내에 있는 자바 객체를 사용가능하게 해줌)

'하둡,spark' 카테고리의 다른 글

spark 데이터프레임 실습1  (0) 2024.01.23
spark 기초 실습(colab)  (0) 2024.01.19
spark 데이터 처리  (0) 2024.01.18
Spark 프로그램 실행 환경  (0) 2024.01.18
spark 기초  (0) 2024.01.18

+ Recent posts