udf실습

'하둡,spark' 카테고리의 다른 글

UDF(User Defined Function) 사용해보기.  (1) 2024.01.25
spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

JOIN (조인) 개요:

  1. 정의:
       - SQL 조인은 두 개 이상의 테이블을 공통 필드를 기반으로 결합하는 작업입니다.
       - 이를 통해 서로 다른 테이블에 분산되어 있는 정보를 통합하고, 복잡한 쿼리를 수행할 수 있습니다.

  2. 스타 스키마와의 관련:**
       - 스타 스키마(Star Schema)는 데이터 웨어하우스에서 사용되는 스키마 중 하나로, 중앙의 대형 테이블(사실상 팩트 테이블)이 여러 작은 차원 테이블들과 조인하여 사용됩니다.

  3. LEFT와 RIGHT 테이블:**
       - 조인에서 왼쪽 테이블을 **LEFT**라고 하고, 오른쪽 테이블을 **RIGHT**라고 합니다.

  4. 조인 결과:**
       - JOIN의 결과는 양쪽의 필드를 모두 가진 새로운 테이블을 생성합니다.
       - 조인의 방식에 따라 선택되는 레코드와 채워지는 필드가 달라집니다.

  5. 조인 방식에 따른 차이:**
       - **어떤 레코드들이 선택되는지?**
         - INNER JOIN: 양쪽 테이블에서 매칭되는 레코드들만 선택됩니다.
         - LEFT JOIN (또는 LEFT OUTER JOIN): 왼쪽 테이블의 모든 레코드가 선택되고, 오른쪽 테이블과 매칭되는 레코드가 있으면 함께 선택됩니다.
         - RIGHT JOIN (또는 RIGHT OUTER JOIN): 오른쪽 테이블의 모든 레코드가 선택되고, 왼쪽 테이블과 매칭되는 레코드가 있으면 함께 선택됩니다.
         - FULL JOIN (또는 FULL OUTER JOIN): 양쪽 테이블의 모든 레코드가 선택되며, 매칭되는 경우 함께 선택됩니다.
       
        - **어떤 필드들이 채워지는지?**
         - INNER JOIN: 매칭되는 필드만 선택됩니다.
         - LEFT JOIN: 왼쪽 테이블의 필드는 항상 선택되고, 오른쪽 테이블과 매칭되는 경우 함께 선택됩니다.
         - RIGHT JOIN: 오른쪽 테이블의 필드는 항상 선택되고, 왼쪽 테이블과 매칭되는 경우 함께 선택됩니다.
         - FULL JOIN: 양쪽 테이블의 필드가 함께 선택되며, 매칭되는 경우 함께 선택됩니다.

 

 

JOIN 실습 - 예제 데이터 준비

  • 신체 데이터 이용 감시 시스템
  • 경고(alert)조건 : 지병이2개이상, 5k이상 살이쪘을때, 2일이상 몸무게 재지 않았을때.

  • INNER JOIN
    1. 양쪽 테이블에서 매치가 되는 레코드들만 리턴함
    2. 양쪽 테이블의 필드가 모두 채워진 상태로 리턴됨
    SELECT *
    FROM Vital v
    JOIN Alert a ON v.vitalID = a.vitalID;

  • LEFT JOIN
    1. 왼쪽 테이블(Base)의 모든 레코드들을 리턴함
    2. 오른쪽 테이블의 필드는 왼쪽 레코드와 매칭되는 경우에만 채워진 상태로 리턴됨
    SELECT *
    FROM raw_data.Vital v
    LEFT JOIN raw_data.Alert a ON v.vitalID = a.vitalID;

  • FULL JOIN
    1. 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들을 리턴함
    2. 매칭되는 경우에만 양쪽 테이블들의 모든 필드들이 채워진 상태로 리턴됨
    SELECT *
    FROM raw_data.Vital v
    FULL JOIN raw_data.Alert a ON v.vitalID = a.vitalID;

 

  • CROSS JOIN
    1. 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들의 조합을 리턴함
    SELECT *
    FROM raw_data.Vital v
    CROSS JOIN raw_data.Alert a;

  • SELF JOIN
    1. 동일한 테이블을 alias를 달리해서 자기 자신과 조인함
    SELECT *
    FROM raw_data.Vital v1
    JOIN raw_data.Vital v2 ON v1.vitalID = v2.vitalID;

 

최적화 관점에서 본 조인의 종류들

  1. Shuffle JOIN:**
       - **일반 조인 방식:**
         - 특정 키를 기반으로 두 개 이상의 데이터셋을 결합하는 일반적인 조인 방식입니다.
       - **Bucket JOIN:**
         - 조인 키를 기반으로 조인을 수행하기 전에 두 데이터셋을 동일한 파티션으로 재분배하는 방식입니다.
         - 조인 키를 사용하여 새로운 파티션을 생성하고, 각 파티션에 속한 데이터를 조인합니다.
         - 이는 데이터를 효율적으로 분산하여 처리하기 위한 방법 중 하나입니다.

  2. Broadcast JOIN:**
       - **개요:**
         - Broadcast JOIN은 큰 데이터셋과 작은 데이터셋 간의 조인을 최적화하는 방식입니다.
         - 작은 데이터셋을 전체 클러스터에 복제(broadcasting)하여 특정 키를 기반으로 큰 데이터셋과 조인합니다.
       - **적용 조건:**
         - 데이터 프레임이 충분히 작은 경우에만 Broadcast JOIN을 사용하는 것이 효과적입니다.
         - 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌림
         - `spark.sql.autoBroadcastJoinThreshold` 파라미터를 통해 어떤 크기 이하의 데이터프레임을 브로드캐스트할 것인지 결정할 수 있습니다. (exercutor에 지정된 메모리 값보다 작게)
       - **장점:**
         - 데이터를 모든 노드로 브로드캐스트하므로 특정 노드에서 조인을 수행할 수 있어 효율적입니다.
         - 네트워크 비용이 감소하고 성능이 향상됩니다.

    이러한 최적화 기법은 Spark에서 대용량 데이터셋을 다룰 때 성능을 향상시키기 위해 사용됩니다. 적절한 조인 전략을 선택하고 데이터셋을 파티셔닝하는 것은 Spark 작업의 성능과 확장성에 큰 영향을 미칩니다.

 

join 을 그림으로 이해

 

Broadcast JOIN을 그림으로 이해

'하둡,spark' 카테고리의 다른 글

UDF 실습  (0) 2024.01.28
UDF(User Defined Function) 사용해보기.  (1) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24
실습5

'하둡,spark' 카테고리의 다른 글

spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습4  (0) 2024.01.24
spark 데이터프레임 실습3  (0) 2024.01.24
spark 데이터프레임 실습2  (0) 2024.01.23
Untitled14

'하둡,spark' 카테고리의 다른 글

spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습3  (0) 2024.01.24
spark 데이터프레임 실습2  (0) 2024.01.23
spark 데이터프레임 실습1  (0) 2024.01.23

 

 

'하둡,spark' 카테고리의 다른 글

spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24
spark 데이터프레임 실습2  (0) 2024.01.23
spark 데이터프레임 실습1  (0) 2024.01.23
spark 기초 실습(colab)  (0) 2024.01.19
실습2

'하둡,spark' 카테고리의 다른 글

spark 데이터프레임 실습4  (0) 2024.01.24
spark 데이터프레임 실습3  (0) 2024.01.24
spark 데이터프레임 실습1  (0) 2024.01.23
spark 기초 실습(colab)  (0) 2024.01.19
spark 데이터 구조  (0) 2024.01.19
Spark DataFrame 실습1

'하둡,spark' 카테고리의 다른 글

spark 데이터프레임 실습3  (0) 2024.01.24
spark 데이터프레임 실습2  (0) 2024.01.23
spark 기초 실습(colab)  (0) 2024.01.19
spark 데이터 구조  (0) 2024.01.19
spark 데이터 처리  (0) 2024.01.18
PySpark_설치_및_테스트

'하둡,spark' 카테고리의 다른 글

spark 데이터프레임 실습2  (0) 2024.01.23
spark 데이터프레임 실습1  (0) 2024.01.23
spark 데이터 구조  (0) 2024.01.19
spark 데이터 처리  (0) 2024.01.18
Spark 프로그램 실행 환경  (0) 2024.01.18

RDD, DataFrame, Dataset (Immutable Distributed Data)

  • 2016년에 DataFrame과 Dataset은 하나의 API로 통합됨
  • 모두 파티션으로 나뉘어 Spark에서 처리됨



RDD (Resilient Distributed Dataset):

  • - **로우레벨 데이터:** RDD는 클러스터 내의 서버에 분산된 로우레벨 데이터를 지칭합니다.
      - **스키마 없음:** 레코드별로 존재하지만 스키마가 존재하지 않습니다. 따라서 구조화된 데이터나 비구조화된 데이터를 모두 지원합니다.

DataFrame과 Dataset:

  • DataFrame:
        - RDD 위에 만들어지는데, 필드 정보를 갖고 있습니다. 이는 테이블 형태로 생각할 수 있습니다.
        - 구조화된 데이터를 다룰 때 유용하며, 스키마 정보를 가지고 있습니다.
        - PySpark에서도 사용 가능하며, 특히 Python에서는 주로 DataFrame을 활용합니다.

  •  Dataset:
     DataFrame과 마찬가지로 RDD 위에 만들어집니다. 하지만 Dataset은 타입 정보가 존재하며, 컴파일 언어에서 사용 
        - 주로 Scala와 Java에서 활용되며, 컴파일러가 타입을 검사할 수 있는 장점이 있습니다.
        - PySpark에서는 주로 DataFrame을 사용하고, 타입 정보가 필요한 경우에는 Dataset을 활용할 수 있습니다.

 

spark sql 엔진

  1. Code Analysis (코드 분석):
       - **역할:** Spark는 사용자 코드를 분석하여 데이터프레임 또는 RDD의 트랜스포메이션 및 액션을 파악합니다.
       - **기능:** 코드 분석 단계에서는 사용자가 작성한 코드를 해석하고, 실행 계획을 수립하기 위한 정보를 추출합니다.

  2. Logical Optimization (Catalyst Optimizer, 논리적 최적화):
       - **역할:** Catalyst Optimizer는 논리적 최적화를 수행하여 불필요한 계산을 제거하고 최적의 실행 계획을 도출
       - **기능:** 논리적 최적화는 사용자의 쿼리를 내부적으로 최적화하여 처리 효율을 향상시킵니다. 예를 들어, 필요없는 컬럼이나 조건을 제거하고, 필터링을 최적화하는 등의 작업이 이루어집니다.

  3. Physical Planning (물리적 계획):
       - **역할:** Catalyst Optimizer에 의해 최적화된 논리적 실행 계획을 물리적 실행 계획으로 변환합니다.
       - **기능:** 최적화된 논리적 계획을 어떻게 물리적으로 실행할지에 대한 계획을 수립합니다. 이는 실제 익스큐터에서 수행될 태스크 및 데이터 셔플링 방법 등을 결정합니다.

  4. Code Generation (Project Tungsten):
       - **역할:** Project Tungsten은 물리적 실행 계획을 효율적으로 구현하기 위해 코드를 생성합니다.
       - **기능:** 물리적 실행 계획에 대한 코드를 생성하고, 코드 실행 시점에서 최적화된 코드를 사용하여 성능을 최대한 끌어올립니다. 코드 생성을 통해 JVM을 효율적으로 활용하고, 불필요한 오버헤드를 최소화합니다.

 

Spark 데이터 구조 - RDD

  • 변경이 불가능한 분산 저장된 데이터:
      - RDD는 여러 개의 파티션으로 나누어진 변경이 불가능한 데이터를 나타냅니다.
      - 각각의 파티션은 클러스터 내에 분산되어 저장되며, 데이터의 내결함성을 제공합니다.
      - RDD는 로우레벨의 함수형 변환을 지원하며, `map`, `filter`, `flatMap` 등의 함수형 프로그래밍 연산을 활용하여 데이터를 조작할 수 있습니다.

  • 일반 파이썬 데이터의 RDD 변환:
      - 파이썬에서는 `parallelize` 함수를 사용하여 일반적인 데이터를 RDD로 변환할 수 있습니다.
      - RDD를 사용하면 분산 환경에서 데이터를 효율적으로 처리할 수 있으며, RDD의 불변성과 내결함성을 활용하여 안정적인 분산 처리를 보장합니다.
      - 반대로, RDD를 일반적인 파이썬 데이터로 변환하려면 `collect` 함수를 사용할 수 있습니다. 이는 RDD의 모든 데이터를 드라이버 노드로 수집하는 역할을 합니다.

 

Spark 데이터 구조 - 데이터 프레임:

  • 변경이 불가한 분산 저장된 데이터:**
      - 데이터 프레임은 RDD와 마찬가지로 변경이 불가능한 분산 저장된 데이터입니다.
      - 여러 개의 파티션으로 나누어져 클러스터에 분산 저장되며, 내결함성과 데이터 처리의 안정성을 제공합니다.

  • 관계형 데이터베이스 테이블과 유사한 구조:**
      - RDD와 달리 데이터 프레임은 관계형 데이터베이스 테이블과 유사한 형태로 구성됩니다.
      - 행과 열로 이루어진 테이블 형식으로 데이터를 저장하며, 각 컬럼은 데이터의 속성을 나타냅니다.
      - 판다스의 데이터 프레임이나 관계형 데이터베이스의 테이블과 유사한 편리한 조작 기능을 제공합니다.

  • 다양한 데이터 소스 지원:**
      - 데이터 프레임은 다양한 데이터 소스로부터 데이터를 읽거나 저장할 수 있습니다.
      - HDFS, Hive, 외부 데이터베이스, 기존 RDD 등 다양한 데이터 소스와 통합하여 사용할 수 있습니다.

  • 다양한 언어 지원:**
      - 데이터 프레임은 스칼라, 자바, 파이썬과 같은 다양한 프로그래밍 언어에서 지원됩니다.
      - 이로써 사용자는 선호하는 언어를 선택하여 데이터 프레임을 조작하고 분석할 수 있습니다.

Spark Session 생성:

  • Spark 프로그램 시작점:
      - Spark 프로그램의 시작은 SparkSession을 생성하는 것으로 이루어집니다.
      - 각 프로그램은 하나의 SparkSession을 생성하며, 이를 통해 Spark Cluster와의 통신이 이루어집니다.
      - SparkSession은 싱글톤 객체로 구현되어 있어, 하나의 프로그램에서 전역적으로 사용됩니다.

  • Spark 2.0에서 도입:
      - Spark 2.0 버전에서 처음으로 도입된 개념으로, 이전에는 SparkContext를 통해 Spark 기능을 사용했습니다.
      - SparkSession은 Spark 2.0에서 통합된 인터페이스로 다양한 기능들을 효과적으로 사용할 수 있도록 지원합니다.

  • 다양한 기능 통합:
      - Spark Session을 통해 DataFrame, SQL, Streaming, ML API 등 다양한 Spark 기능에 접근할 수 있습니다.
      - config 메소드를 활용하여 다양한 환경 설정이 가능하며, 각종 작업을 효율적으로 수행할 수 있습니다.

  • RDD와의 관계:
      - RDD와 관련된 작업을 수행할 때는 SparkSession 밑의 sparkContext 객체를 사용합니다.
      - SparkSession은 DataFrame과 관련된 작업에 최적화되어 있으며, RDD 작업은 sparkContext를 통해 수행됩니다.

Spark 세션 생성 - PySpark 예제

  • 위의 코드는 PySpark에서 Spark Session을 생성하는 코드입니다. 각각의 파라미터에 대한 설명은 다음과 같습니다:

    - **master("local[*]"):**
      - Spark 클러스터의 주소 또는 클러스터 관리자에 대한 URL을 지정합니다.
      - "local[*]"는 로컬 환경에서 모든 가능한 코어를 사용하여 실행하도록 지정합니다. 이것은 주로 개발 및 테스트 목적으로 사용됩니다.

    - **appName('PySpark Tutorial'):**
      - 생성되는 Spark 애플리케이션의 이름을 지정합니다. 이는 Spark 클러스터의 웹 UI에서 식별에 사용됩니다.

    - **getOrCreate():**
      - SparkSession 객체를 생성하거나 이미 존재하는 경우 기존의 SparkSession을 반환합니다.
      - SparkSession은 보통 하나의 애플리케이션에 대해 하나만 생성되어야 하므로, 이미 존재하는지 여부를 체크하고 새로 생성하거나 반환합니다.

    이 코드를 통해 SparkSession이 생성되면, 해당 객체를 사용하여 PySpark에서 다양한 기능들을 수행할 수 있습니다.

 

PySpark에 주요 모듈 및 클래스

  1. pyspark.sql.SparkSession:**
       - Spark 애플리케이션을 시작하기 위한 핵심 진입점인 Spark 세션을 생성하는 클래스입니다.

  2. pyspark.sql.DataFrame:**
       - 테이블이나 데이터베이스 테이블과 유사한 분산 데이터프레임을 나타내는 클래스입니다. 구조화된 데이터를 처리하는 데 사용됩니다.

  3. pyspark.sql.Column:**
       - DataFrame의 컬럼을 나타내는 클래스로, 특정 컬럼에 대한 작업 및 변환을 수행하는 데 사용됩니다.

  4. pyspark.sql.Row:**
       - DataFrame의 레코드를 나타내는 클래스로, 각 레코드는 여러 컬럼의 값으로 구성됩니다.

  5. pyspark.sql.functions:**
       - 다양한 내장 함수를 제공하는 모듈로, DataFrame 컬럼에 대한 다양한 연산과 변환을 수행하는 데 사용됩니다.

  6. pyspark.sql.types:**
       - 다양한 데이터 타입을 정의하는 모듈로, DataFrame의 스키마를 정의하거나 데이터 타입 변환에 사용됩니다.

  7. pyspark.sql.Window:**
       - 윈도우 함수를 정의하고 사용하는 데 사용되는 클래스로, 데이터의 윈도우 기반 분석을 수행하는 데 도움을 줌

    이러한 모듈과 클래스는 PySpark를 사용하여 구조화된 데이터를 처리하고 분석하는 데 필수적이며, 다양한 작업 및 변환을 수행할 수 있도록 도와줍니다.

 

Spark Session을 생성할 때  주요 환경 변수 몇 가지

  1. executor별 메모리:** `spark.executor.memory` (기본값: 1g)
       - 각 익스큐터가 사용할 메모리 양을 설정합니다.

  2. executor별 CPU 수:** `spark.executor.cores` (YARN에서는 기본값 1)
       - 각 익스큐터가 사용할 CPU 코어 수를 설정합니다.

  3. driver 메모리:** `spark.driver.memory` (기본값: 1g)
       - 드라이버가 사용할 메모리 양을 설정합니다.

  4. Shuffle 후 Partition의 수:** `spark.sql.shuffle.partitions` (기본값: 최대 200)
       - 셔플 후 생성되는 파티션의 수를 설정합니다.

  5. 그 외 환경변수들 https://spark.apache.org/docs/latest/configuration.html#application-properties

Spark Session의 환경 설정은 다음 4가지 방법으로 수행할 수 있습니다:

  1. 환경변수:**
       - 환경 변수를 사용하여 Spark Session의 설정을 지정할 수 있습니다.

  2. $SPARK_HOME/conf/spark_defaults.conf:**
       - Spark이 사용하는 기본 설정 파일인 `spark_defaults.conf`를 편집하여 설정을 지정할 수 있습니다.

  3. spark-submit 명령의 커맨드라인 파라미터:**
       - `spark-submit` 명령을 사용할 때, 명령행에서 직접 설정을 지정할 수 있습니다.

  4. SparkSession 만들 때 지정:**
       - Spark Session을 생성할 때 직접 설정을 지정할 수 있습니다. 이는 `SparkConf` 객체를 통해 수행됩니다.

    이 중에서 `spark-submit` 명령의 커맨드라인 파라미터와 SparkSession을 만들 때 지정하는 방법은 각각 프로그램 실행 시 동적으로 설정을 변경할 수 있는 방법입니다.

spark 환경설정 방법( sparksession 생성시 일일히 지정)

  • from pyspark.sql import SparkSession
    # SparkSession은 싱글턴이므로 이미 존재하는 경우 가져오고, 없으면 새로 생성합니다.
    spark = SparkSession.builder \
        .master("local[*]") \  # 클러스터 매니저를 설정합니다. "local[*]"는 로컬 모드에서 사용 가능한 모든 코어를 활용함을 의미합니다.
        .appName('PySpark Tutorial') \  # Spark 애플리케이션의 이름을 지정합니다.
        .config("spark.some.config.option1", "some-value") \  # 사용자가 지정한 Spark 환경 설정을 추가합니다.
        .config("spark.some.config.option2", "some-value") \
        .getOrCreate()  # 이미 존재하는 SparkSession이 있다면 가져오고, 없다면 새로 생성합니다.

spark 환경설정 방법( SparkConf 객체에 환경 설정하고 SparkSession에 지정)

  • from pyspark.sql import SparkSession
    from pyspark import SparkConf

    # SparkConf 객체를 생성하고 필요한 설정을 추가합니다.
    conf = SparkConf()
    conf.set("spark.app.name", "PySpark Tutorial")  # Spark 애플리케이션의 이름을 설정합니다.
    conf.set("spark.master", "local[*]")  # 클러스터 매니저를 설정합니다. "local[*]"는 로컬 모드에서 사용 가능한 모든 코어를 활용함을 의미합니다.

    # SparkSession을 생성하고 SparkConf 객체를 전달합니다.
    # 이미 존재하는 SparkSession이 있다면 가져오고, 없다면 새로 생성합니다.
    spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

전체적인 PySpark 프로세스의 흐름

  1. Spark 세션 생성:**
       - `SparkSession`을 생성하여 Spark 클러스터와 통신합니다.

  2. 입력 데이터 로딩:**
       - `SparkSession`을 사용하여 원하는 입력 데이터를 로드합니다.

  3. 데이터 조작 작업:**
       - DataFrame API나 Spark SQL을 활용하여 데이터를 조작합니다.
       - 판다스와 유사한 방식으로 작업하며, 필요한 경우 새로운 DataFrame을 생성합니다.
       - 원하는 결과가 나올 때까지 다양한 작업을 수행합니다.

  4. 최종 결과 저장:**
       - 조작한 데이터나 계산된 결과를 최종적으로 저장합니다.
       - 다양한 출력 포맷에 따라 적절한 형식으로 결과를 저장할 수 있습니다.

    이러한 플로우를 통해 PySpark을 활용하여 데이터 처리 및 분석 작업을 수행할 수 있습니다.

 

Spark Session에서 지원하는 주요 데이터 소스

  • `spark.read`와 `spark.write`를 사용하여 데이터를 로드하고 저장합니다. 

  • 많이 사용되는 데이터 소스들
    HDFS 파일:**
       - CSV, JSON, Parquet, ORC, Text, Avro 등과 같은 다양한 파일 형식을 지원합니다.
       - 특히, Parquet, ORC, Avro는 고성능 및 압축 효율성을 제공하는데, 이에 대한 자세한 내용은 나중에 설명될 것
       - Hive 테이블

  • JDBC 관계형 데이터베이스:**
       - JDBC를 통해 연결된 다양한 관계형 데이터베이스에서 데이터를 읽거나 쓸 수 있습니다.

  • 클라우드 기반 데이터 시스템:**
       - 다양한 클라우드 플랫폼에서 제공하는 데이터 저장소나 데이터 서비스를 지원합니다.

  • 스트리밍 시스템:**
       - Spark Streaming을 사용하여 실시간 데이터를 처리하거나, 다양한 스트리밍 소스에서 데이터를 읽을 수 있습니다.
    이를 통해 Spark은 다양한 데이터 소스와 형식을 효율적으로 처리하고 분석할 수 있습니다.

 

Spark 개발 환경 옵션

  • Local Standalone Spark + Spark Shell
  • Python IDE – PyCharm, Visual Studio 
  • Databricks Cloud – 커뮤니티 에디션을 무료로 사용
  • 다른 노트북 – 주피터 노트북, 구글 Colab, 아나콘다 등등

Local Standalone Spark:

  • 로컬 스탠드얼론(Spark Standalone)은 Spark을 단일 머신에서 실행하고 테스트하기 위한 클러스터 매니저 옵션입니다. 주로 개발 및 간단한 테스트 목적으로 사용됩니다.

    - Spark Cluster Manager 설정:**
      - `master`를 `local[n]`으로 설정하여 사용합니다.
      - `n`은 사용할 쓰레드 수를 나타내며, 로컬 클러스터 내에서 실행될 스레드 수를 조절합니다.

    - 주로 개발이나 간단한 테스트 용도
    - 하나의 JVM(Java Virtual Machine)에서 모든 프로세스가 실행됩니다.
      - 단일 Driver와 단일 Executor가 실행됩니다.
      - 1+ 쓰레드가 Executor안에서 실행됨

    - Executor 내에서 생성되는 쓰레드 수는 다음과 같이 조절됩니다:
      - `local`: 하나의 쓰레드만 생성됩니다.
      - `local[*]`: 컴퓨터의 CPU 수만큼 쓰레드가 생성됩니다.

    로컬 스탠드얼론 모드를 사용하면 효율적으로 개발 및 테스트를 수행할 수 있으며, 단일 머신에서 Spark 애플리케이션을 실행하는 간편한 방법을 제공합니다.

 

구글 Colab에서 Spark 사용

  • PySpark + Py4J를 설치
    1) 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
    2) 개발 목적으로는 충분하지만 큰 데이터의 처리는 불가
    3) Spark Web UI는 기본적으로는 접근 불가 (ngrok을 통해 억지로 열 수는 있음)
    4) Py4J(파이썬에서 JVM내에 있는 자바 객체를 사용가능하게 해줌)

'하둡,spark' 카테고리의 다른 글

spark 데이터프레임 실습1  (0) 2024.01.23
spark 기초 실습(colab)  (0) 2024.01.19
spark 데이터 처리  (0) 2024.01.18
Spark 프로그램 실행 환경  (0) 2024.01.18
spark 기초  (0) 2024.01.18

spark 데이터 시스템 아키텍처

데이터 병렬 처리 가능성 조건

  1. 데이터 분산
       - **하둡 맵의 데이터 처리 단위:** 하둡에서는 데이터를 먼저 분산시켜야 합니다. 맵리듀스에서의 데이터 처리 단위는 디스크에 있는 데이터 블록으로, 일반적으로 128MB 크기입니다. 이 크기는 `hdfs-site.xml`의 `dfs.block.size` 프로퍼티에 의해 결정됩니다.
       - **Spark에서의 파티션:** Spark에서는 데이터를 파티션으로 구분합니다. 기본적으로 파티션의 크기도 128MB(조절 가능)이며, 이는 `spark.sql.files.maxPartitionBytes` 프로퍼티에 의해 설정됩니다. 이 프로퍼티는 주로 HDFS 등에 있는 파일을 읽어올 때 적용됩니다.

  2. 동시 처리
       - **맵리듀스에서의 처리:** 맵리듀스에서는 N개의 데이터 블록으로 구성된 파일을 처리할 때 N개의 Map 태스크가 동시에 실행됩니다.
       - **Spark에서의 처리:** Spark에서는 데이터가 파티션 단위로 메모리에 로드되어 Executor에 배정되어 동시에 처리됩니다. Executor는 클러스터에서 실행되는 독립적인 작업 단위입니다.

 

spark 데이터 처리 흐름

  • 파티션 구성:
    • 작은 파티션: 데이터프레임은 작은 파티션들로 구성됩니다. 이는 데이터를 효율적으로 처리하기 위한 작업 단위로, 작은 단위로 데이터를 분할하여 병렬 처리를 가능케 합니다.
  • 불변성 (Immutable):
    • 데이터프레임의 불변성: 데이터프레임은 한 번 생성되면 수정할 수 없는 불변성을 가지고 있습니다. 이는 새로운 데이터프레임이 필요할 때 기존 데이터프레임을 변경하는 대신 새로운 데이터프레임을 생성하는 방식으로 동작합니다.
  • 변환 작업:
    • 다양한 변환 작업: 입력 데이터프레임을 원하는 결과까지 다른 데이터프레임으로 계속 변환할 수 있습니다. 이러한 변환 작업에는 sort, group by, filter, map, join 등이 포함됩니다.



셔플링 (Shuffling):

  • 셔플링 발생 상황:
        - 명시적으로 파티션을 변경하는 경우 (예: 파티션 수를 줄일 때)
        - 시스템에 의해 이뤄지는 셔플링 작업, 예를 들면 그룹핑(aggregation)이나 정렬(sorting)과 같은 작업이 있을 때

  • 셔플링이 발생할 때 네트웍을 타고 데이터가 이동하게 됨
        - 셔플링이 발생하면 파티션 간에 데이터 이동이 필요합니다. 이때 네트워크를 통해 데이터가 이동하게 됩니다.
        - 몇 개의 파티션이 결과로 만들어질까?
        - 셔플링이 발생할 때 최종적으로 몇 개의 파티션이 결과로 생성될지는 `spark.sql.shuffle.partitions` 프로퍼티에 의해 결정됩니다. 이 값은 기본적으로 200이며, 이는 최대 파티션 수를 나타냅니다.
        - 오퍼레이션에 따라 파티션 수가 동적으로 결정되며, 예를 들어 랜덤, 해싱 파티셔닝, 레인지 파티셔닝 등이 있다.
        - 정렬 작업의 경우 레인지 파티셔닝을 사용하게 됩니다.
      - **Data Skew 발생 가능성:**
        - 셔플링 과정에서 데이터 스뷰(skew)가 발생할 수 있습니다. 일부 파티션에 데이터가 고르게 분배되지 않고 특정 파티션에 데이터가 집중되는 현상을 말합니다.


셔플링: hashing partition( Aggregation 오퍼레이션)

 

데이터 스뷰 (Data Skewness)

  • 데이터 파티셔닝의 병렬성과 단점:
      - 데이터 파티셔닝은 데이터 처리에 병렬성을 제공하지만, 데이터가 균등하게 분포하지 않는 경우에는 단점이 발생
      - 이러한 단점은 주로 데이터 셔플링(shuffling) 이후에 발생합니다.

  • Data Skew 발생 원인:
      - **셔플링 이후 데이터 불균형:** 셔플링 작업을 통해 데이터를 재분배하는 과정에서 특정 파티션에 데이터가 과도하게 집중되는 현상으로 인해 Data Skew가 발생합니다.

  • 대응 방안:**
      - 셔플링 최소화:** 데이터 셔플링을 최소화하는 것이 중요합니다. 셔플링은 비용이 큰 작업 중 하나이며, 최소화하면 성능을 향상시킬 수 있습니다.
      - 파티션 최적화:** 데이터 파티션의 수를 최적으로 조절하여 균형 있는 분산을 유지하도록 최적화하는 것이 중요합니다. 효율적인 파티션 설정은 Data Skew를 방지하고 성능을 향상시킬 수 있습니다.

 

'하둡,spark' 카테고리의 다른 글

spark 기초 실습(colab)  (0) 2024.01.19
spark 데이터 구조  (0) 2024.01.19
Spark 프로그램 실행 환경  (0) 2024.01.18
spark 기초  (0) 2024.01.18
맵리듀스(MapReduce) 문제점  (0) 2024.01.18

+ Recent posts