udf실습

'하둡,spark' 카테고리의 다른 글

UDF(User Defined Function) 사용해보기.  (1) 2024.01.25
spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

UDF (User Defined Function):

  1. 개요:**
       - **UDF (User Defined Function):** 사용자가 직접 정의한 함수로, Spark에서는 DataFrame이나 SQL에서 사용할 수 있는 사용자 정의 함수를 의미합니다.
       - 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적, spark SQL에서도 사용 가능함
       - UDF를 사용하면 개발자는 Spark의 기본 제공 함수 외에도 자신만의 사용자 정의 함수를 생성하여 데이터 처리 작업을 수행할 수 있습니다.

  2. 종류:**
       - **Scalar 함수:**
         - 개별 데이터 레코드에 적용되는 함수로, 각 레코드에 대해 개별적으로 값을 반환합니다.
         - 예시: `UPPER`, `LOWER`와 같은 문자열 처리 함수 등이 Scalar 함수에 해당합니다.
       - **Aggregation 함수 (UDAF - User Defined Aggregation Function):**
         - 여러 레코드에 대한 집계 결과를 반환하는 함수입니다.
         - 예시: `SUM`, `MIN`, `MAX`와 같은 집계 함수 등이 Aggregation 함수에 해당합니다.

  3. 사용 방법:**
       - **DataFrame API:**
         - Scala, Python, Java 등의 언어를 사용하여 UDF를 정의하고, DataFrame의 `withColumn` 또는 `select` 등의 함수를 통해 적용할 수 있습니다.
       - **SQL:**
         - SQL에서는 `CREATE FUNCTION`을 사용하여 UDF를 정의하고, 이를 SQL 문에서 활용할 수 있습니다.
         - `REGISTER FUNCTION` 등을 사용하여 Spark에서 사용자 정의 함수를 등록할 수도 있습니다.

  4. 예시:**
       - **Scala에서의 DataFrame UDF:**
         import org.apache.spark.sql.functions.udf

         val myUDF = udf((value: String) => {
           // 사용자 정의 로직 적용
           // ...
           // 결과 반환
         })

         val resultDF = inputDF.withColumn("newColumn", myUDF(col("columnName")))

       - **SQL에서의 UDF 등록 및 사용:**
         -- UDF 등록
         CREATE OR REPLACE FUNCTION my_udf AS
         'com.example.MyUDF' USING JAR 'path/to/jar/file';

         -- SQL에서 UDF 사용
         SELECT my_udf(columnName) AS newColumn FROM tableName;

  5. 그외 간단한 경우 함수 구현 방법
    1) 파이썬 람다 함수:
       - 간단한 함수를 한 줄로 작성할 때 주로 사용됩니다.
       - 예시:
         add = lambda x, y: x + y
         result = add(3, 5)
         print(result)  # 출력: 8

    2) 파이썬 일반 함수:**
       - 더 복잡한 로직이 필요할 때 사용됩니다.
       - 예시:
         def add(x, y):
             return x + y

         result = add(3, 5)
         print(result)  # 출력: 8

UDF - DataFrame에 사용1

import pyspark.sql.functions as F
from pyspark.sql.types import *

# 사용자 정의 함수(Upper UDF) 정의
upperUDF = F.udf(lambda z: z.upper())

# DataFrame에 새로운 컬럼 추가 및 UDF 적용
df.withColumn("Curated Name", upperUDF("Name"))

세부 설명:

1. `import pyspark.sql.functions as F`: PySpark의 함수들을 사용하기 위해 필요한 모듈을 임포트합니다. `F`는 편의상 별칭(alias)으로 사용되었습니다.

2. `from pyspark.sql.types import *`: PySpark의 데이터 타입들을 사용하기 위해 모듈을 임포트합니다. 여기서는 모든 데이터 타입을 가져옵니다.

3. `upperUDF = F.udf(lambda z: z.upper())`: PySpark의 `udf` 함수를 사용하여 사용자 정의 함수(Upper UDF)를 정의합니다. 이 UDF는 주어진 문자열을 대문자로 변환합니다.

4. `df.withColumn("Curated Name", upperUDF("Name"))`: DataFrame에 새로운 컬럼("Curated Name")을 추가하고, 이 컬럼에 `upperUDF`를 적용하여 "Name" 컬럼의 값들을 대문자로 변환합니다.

 

UDF - SQL에 사용1

def upper(s):
    return s.upper()

# 사용자 정의 함수(Upper UDF) 등록
upperUDF = spark.udf.register("upper", upper)

# Spark SQL을 사용하여 UDF 테스트
spark.sql("SELECT upper('aBcD')").show()

# DataFrame을 테이블로 등록
df.createOrReplaceTempView("test")

# DataFrame 기반 SQL에서 UDF 적용
result_df = spark.sql("""SELECT name, upper(name) AS 
"Curated Name" FROM test""")
result_df.show()

 

UDF - DataFram에 사용2

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSession 생성
spark = SparkSession.builder.appName("example").getOrCreate()

# 데이터 생성
data = [
    {"a": 1, "b": 2},
    {"a": 5, "b": 5}
]

# DataFrame 생성
df = spark.createDataFrame(data)

# 사용자 정의 함수(Upper UDF) 정의
sumUDF = F.udf(lambda x, y: x + y, IntegerType())

# DataFrame에 새로운 컬럼("c") 추가 및 UDF 적용
result_df = df.withColumn("c", sumUDF("a", "b"))

# 결과 출력
result_df.show()

 

UDF - SQL에 사용 2

def plus(x, y):
    return x + y

# 사용자 정의 함수(Plus UDF) 등록(register)
plusUDF = spark.udf.register("plus", plus)

# Spark SQL을 사용하여 UDF 테스트
spark.sql("SELECT plus(1, 2)").show()

# DataFrame을 테이블로 등록
df.createOrReplaceTempView("test")

# DataFrame 기반 SQL에서 UDF 적용
result_df = spark.sql("SELECT a, b, plus(a, b) AS c FROM test")
result_df.show()

 

UDF - Pandas UDF Scalar 함수 사용

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Pandas UDF 정의
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
    return s.str.upper()

# Pandas UDF를 Spark SQL UDF로 등록
upperUDF = spark.udf.register("upper_udf", upper_udf2)

# DataFrame에서 Pandas UDF 적용하여 결과 출력
df.select("Name", upperUDF("Name")).show()

# Spark SQL에서 Pandas UDF 적용하여 결과 출력
spark.sql("""SELECT Name, upper_udf(Name) AS `Curated Name` 
FROM test""").show()

 

UDF - DataFrame/SQL에 Aggregation 사용

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Pandas UDF 정의
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
    return v.mean()

# Pandas UDF를 Spark SQL UDF로 등록
averageUDF = spark.udf.register('average', average)

# Spark SQL에서 Pandas UDF 적용하여 결과 출력
spark.sql('SELECT average(b) FROM test').show()

# DataFrame에서 Pandas UDF 적용하여 결과 출력
df.agg(averageUDF("b").alias("average")).show()

 

'하둡,spark' 카테고리의 다른 글

UDF 실습  (0) 2024.01.28
spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

JOIN (조인) 개요:

  1. 정의:
       - SQL 조인은 두 개 이상의 테이블을 공통 필드를 기반으로 결합하는 작업입니다.
       - 이를 통해 서로 다른 테이블에 분산되어 있는 정보를 통합하고, 복잡한 쿼리를 수행할 수 있습니다.

  2. 스타 스키마와의 관련:**
       - 스타 스키마(Star Schema)는 데이터 웨어하우스에서 사용되는 스키마 중 하나로, 중앙의 대형 테이블(사실상 팩트 테이블)이 여러 작은 차원 테이블들과 조인하여 사용됩니다.

  3. LEFT와 RIGHT 테이블:**
       - 조인에서 왼쪽 테이블을 **LEFT**라고 하고, 오른쪽 테이블을 **RIGHT**라고 합니다.

  4. 조인 결과:**
       - JOIN의 결과는 양쪽의 필드를 모두 가진 새로운 테이블을 생성합니다.
       - 조인의 방식에 따라 선택되는 레코드와 채워지는 필드가 달라집니다.

  5. 조인 방식에 따른 차이:**
       - **어떤 레코드들이 선택되는지?**
         - INNER JOIN: 양쪽 테이블에서 매칭되는 레코드들만 선택됩니다.
         - LEFT JOIN (또는 LEFT OUTER JOIN): 왼쪽 테이블의 모든 레코드가 선택되고, 오른쪽 테이블과 매칭되는 레코드가 있으면 함께 선택됩니다.
         - RIGHT JOIN (또는 RIGHT OUTER JOIN): 오른쪽 테이블의 모든 레코드가 선택되고, 왼쪽 테이블과 매칭되는 레코드가 있으면 함께 선택됩니다.
         - FULL JOIN (또는 FULL OUTER JOIN): 양쪽 테이블의 모든 레코드가 선택되며, 매칭되는 경우 함께 선택됩니다.
       
        - **어떤 필드들이 채워지는지?**
         - INNER JOIN: 매칭되는 필드만 선택됩니다.
         - LEFT JOIN: 왼쪽 테이블의 필드는 항상 선택되고, 오른쪽 테이블과 매칭되는 경우 함께 선택됩니다.
         - RIGHT JOIN: 오른쪽 테이블의 필드는 항상 선택되고, 왼쪽 테이블과 매칭되는 경우 함께 선택됩니다.
         - FULL JOIN: 양쪽 테이블의 필드가 함께 선택되며, 매칭되는 경우 함께 선택됩니다.

 

 

JOIN 실습 - 예제 데이터 준비

  • 신체 데이터 이용 감시 시스템
  • 경고(alert)조건 : 지병이2개이상, 5k이상 살이쪘을때, 2일이상 몸무게 재지 않았을때.

  • INNER JOIN
    1. 양쪽 테이블에서 매치가 되는 레코드들만 리턴함
    2. 양쪽 테이블의 필드가 모두 채워진 상태로 리턴됨
    SELECT *
    FROM Vital v
    JOIN Alert a ON v.vitalID = a.vitalID;

  • LEFT JOIN
    1. 왼쪽 테이블(Base)의 모든 레코드들을 리턴함
    2. 오른쪽 테이블의 필드는 왼쪽 레코드와 매칭되는 경우에만 채워진 상태로 리턴됨
    SELECT *
    FROM raw_data.Vital v
    LEFT JOIN raw_data.Alert a ON v.vitalID = a.vitalID;

  • FULL JOIN
    1. 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들을 리턴함
    2. 매칭되는 경우에만 양쪽 테이블들의 모든 필드들이 채워진 상태로 리턴됨
    SELECT *
    FROM raw_data.Vital v
    FULL JOIN raw_data.Alert a ON v.vitalID = a.vitalID;

 

  • CROSS JOIN
    1. 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들의 조합을 리턴함
    SELECT *
    FROM raw_data.Vital v
    CROSS JOIN raw_data.Alert a;

  • SELF JOIN
    1. 동일한 테이블을 alias를 달리해서 자기 자신과 조인함
    SELECT *
    FROM raw_data.Vital v1
    JOIN raw_data.Vital v2 ON v1.vitalID = v2.vitalID;

 

최적화 관점에서 본 조인의 종류들

  1. Shuffle JOIN:**
       - **일반 조인 방식:**
         - 특정 키를 기반으로 두 개 이상의 데이터셋을 결합하는 일반적인 조인 방식입니다.
       - **Bucket JOIN:**
         - 조인 키를 기반으로 조인을 수행하기 전에 두 데이터셋을 동일한 파티션으로 재분배하는 방식입니다.
         - 조인 키를 사용하여 새로운 파티션을 생성하고, 각 파티션에 속한 데이터를 조인합니다.
         - 이는 데이터를 효율적으로 분산하여 처리하기 위한 방법 중 하나입니다.

  2. Broadcast JOIN:**
       - **개요:**
         - Broadcast JOIN은 큰 데이터셋과 작은 데이터셋 간의 조인을 최적화하는 방식입니다.
         - 작은 데이터셋을 전체 클러스터에 복제(broadcasting)하여 특정 키를 기반으로 큰 데이터셋과 조인합니다.
       - **적용 조건:**
         - 데이터 프레임이 충분히 작은 경우에만 Broadcast JOIN을 사용하는 것이 효과적입니다.
         - 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌림
         - `spark.sql.autoBroadcastJoinThreshold` 파라미터를 통해 어떤 크기 이하의 데이터프레임을 브로드캐스트할 것인지 결정할 수 있습니다. (exercutor에 지정된 메모리 값보다 작게)
       - **장점:**
         - 데이터를 모든 노드로 브로드캐스트하므로 특정 노드에서 조인을 수행할 수 있어 효율적입니다.
         - 네트워크 비용이 감소하고 성능이 향상됩니다.

    이러한 최적화 기법은 Spark에서 대용량 데이터셋을 다룰 때 성능을 향상시키기 위해 사용됩니다. 적절한 조인 전략을 선택하고 데이터셋을 파티셔닝하는 것은 Spark 작업의 성능과 확장성에 큰 영향을 미칩니다.

 

join 을 그림으로 이해

 

Broadcast JOIN을 그림으로 이해

'하둡,spark' 카테고리의 다른 글

UDF 실습  (0) 2024.01.28
UDF(User Defined Function) 사용해보기.  (1) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

+ Recent posts