UDF (User Defined Function):

  1. 개요:**
       - **UDF (User Defined Function):** 사용자가 직접 정의한 함수로, Spark에서는 DataFrame이나 SQL에서 사용할 수 있는 사용자 정의 함수를 의미합니다.
       - 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적, spark SQL에서도 사용 가능함
       - UDF를 사용하면 개발자는 Spark의 기본 제공 함수 외에도 자신만의 사용자 정의 함수를 생성하여 데이터 처리 작업을 수행할 수 있습니다.

  2. 종류:**
       - **Scalar 함수:**
         - 개별 데이터 레코드에 적용되는 함수로, 각 레코드에 대해 개별적으로 값을 반환합니다.
         - 예시: `UPPER`, `LOWER`와 같은 문자열 처리 함수 등이 Scalar 함수에 해당합니다.
       - **Aggregation 함수 (UDAF - User Defined Aggregation Function):**
         - 여러 레코드에 대한 집계 결과를 반환하는 함수입니다.
         - 예시: `SUM`, `MIN`, `MAX`와 같은 집계 함수 등이 Aggregation 함수에 해당합니다.

  3. 사용 방법:**
       - **DataFrame API:**
         - Scala, Python, Java 등의 언어를 사용하여 UDF를 정의하고, DataFrame의 `withColumn` 또는 `select` 등의 함수를 통해 적용할 수 있습니다.
       - **SQL:**
         - SQL에서는 `CREATE FUNCTION`을 사용하여 UDF를 정의하고, 이를 SQL 문에서 활용할 수 있습니다.
         - `REGISTER FUNCTION` 등을 사용하여 Spark에서 사용자 정의 함수를 등록할 수도 있습니다.

  4. 예시:**
       - **Scala에서의 DataFrame UDF:**
         import org.apache.spark.sql.functions.udf

         val myUDF = udf((value: String) => {
           // 사용자 정의 로직 적용
           // ...
           // 결과 반환
         })

         val resultDF = inputDF.withColumn("newColumn", myUDF(col("columnName")))

       - **SQL에서의 UDF 등록 및 사용:**
         -- UDF 등록
         CREATE OR REPLACE FUNCTION my_udf AS
         'com.example.MyUDF' USING JAR 'path/to/jar/file';

         -- SQL에서 UDF 사용
         SELECT my_udf(columnName) AS newColumn FROM tableName;

  5. 그외 간단한 경우 함수 구현 방법
    1) 파이썬 람다 함수:
       - 간단한 함수를 한 줄로 작성할 때 주로 사용됩니다.
       - 예시:
         add = lambda x, y: x + y
         result = add(3, 5)
         print(result)  # 출력: 8

    2) 파이썬 일반 함수:**
       - 더 복잡한 로직이 필요할 때 사용됩니다.
       - 예시:
         def add(x, y):
             return x + y

         result = add(3, 5)
         print(result)  # 출력: 8

UDF - DataFrame에 사용1

import pyspark.sql.functions as F
from pyspark.sql.types import *

# 사용자 정의 함수(Upper UDF) 정의
upperUDF = F.udf(lambda z: z.upper())

# DataFrame에 새로운 컬럼 추가 및 UDF 적용
df.withColumn("Curated Name", upperUDF("Name"))

세부 설명:

1. `import pyspark.sql.functions as F`: PySpark의 함수들을 사용하기 위해 필요한 모듈을 임포트합니다. `F`는 편의상 별칭(alias)으로 사용되었습니다.

2. `from pyspark.sql.types import *`: PySpark의 데이터 타입들을 사용하기 위해 모듈을 임포트합니다. 여기서는 모든 데이터 타입을 가져옵니다.

3. `upperUDF = F.udf(lambda z: z.upper())`: PySpark의 `udf` 함수를 사용하여 사용자 정의 함수(Upper UDF)를 정의합니다. 이 UDF는 주어진 문자열을 대문자로 변환합니다.

4. `df.withColumn("Curated Name", upperUDF("Name"))`: DataFrame에 새로운 컬럼("Curated Name")을 추가하고, 이 컬럼에 `upperUDF`를 적용하여 "Name" 컬럼의 값들을 대문자로 변환합니다.

 

UDF - SQL에 사용1

def upper(s):
    return s.upper()

# 사용자 정의 함수(Upper UDF) 등록
upperUDF = spark.udf.register("upper", upper)

# Spark SQL을 사용하여 UDF 테스트
spark.sql("SELECT upper('aBcD')").show()

# DataFrame을 테이블로 등록
df.createOrReplaceTempView("test")

# DataFrame 기반 SQL에서 UDF 적용
result_df = spark.sql("""SELECT name, upper(name) AS 
"Curated Name" FROM test""")
result_df.show()

 

UDF - DataFram에 사용2

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSession 생성
spark = SparkSession.builder.appName("example").getOrCreate()

# 데이터 생성
data = [
    {"a": 1, "b": 2},
    {"a": 5, "b": 5}
]

# DataFrame 생성
df = spark.createDataFrame(data)

# 사용자 정의 함수(Upper UDF) 정의
sumUDF = F.udf(lambda x, y: x + y, IntegerType())

# DataFrame에 새로운 컬럼("c") 추가 및 UDF 적용
result_df = df.withColumn("c", sumUDF("a", "b"))

# 결과 출력
result_df.show()

 

UDF - SQL에 사용 2

def plus(x, y):
    return x + y

# 사용자 정의 함수(Plus UDF) 등록(register)
plusUDF = spark.udf.register("plus", plus)

# Spark SQL을 사용하여 UDF 테스트
spark.sql("SELECT plus(1, 2)").show()

# DataFrame을 테이블로 등록
df.createOrReplaceTempView("test")

# DataFrame 기반 SQL에서 UDF 적용
result_df = spark.sql("SELECT a, b, plus(a, b) AS c FROM test")
result_df.show()

 

UDF - Pandas UDF Scalar 함수 사용

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Pandas UDF 정의
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
    return s.str.upper()

# Pandas UDF를 Spark SQL UDF로 등록
upperUDF = spark.udf.register("upper_udf", upper_udf2)

# DataFrame에서 Pandas UDF 적용하여 결과 출력
df.select("Name", upperUDF("Name")).show()

# Spark SQL에서 Pandas UDF 적용하여 결과 출력
spark.sql("""SELECT Name, upper_udf(Name) AS `Curated Name` 
FROM test""").show()

 

UDF - DataFrame/SQL에 Aggregation 사용

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Pandas UDF 정의
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
    return v.mean()

# Pandas UDF를 Spark SQL UDF로 등록
averageUDF = spark.udf.register('average', average)

# Spark SQL에서 Pandas UDF 적용하여 결과 출력
spark.sql('SELECT average(b) FROM test').show()

# DataFrame에서 Pandas UDF 적용하여 결과 출력
df.agg(averageUDF("b").alias("average")).show()

 

'하둡,spark' 카테고리의 다른 글

UDF 실습  (0) 2024.01.28
spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

+ Recent posts