설명 : BFS(너비 우선 탐색) 알고리즘은 그래프나 트리에서 한 노드로부터 시작하여 인접한 모든 노드를 먼저 방문하는 알고리즘입니다. 이 알고리즘은 큐(queue)를 사용하여 구현됩니다. 큐에는 방문할 노드들이 차례대로 저장되어 있으며, 먼저 들어온 노드를 먼저 방문합니다.

BFS의 구체적인 동작 과정
1. 시작 노드를 큐에 넣고 방문 여부를 표시합니다.
2. 큐가 빌 때까지 다음 과정을 반복합니다.
   - 큐에서 하나의 노드를 꺼냅니다.
   - 해당 노드의 인접한 노드들 중 방문하지 않은 노드들을 큐에 넣고 방문 여부를 표시합니다.
3. 큐가 비면 탐색을 종료합니다.

BFS 알고리즘을 파이썬으로 구현한 예시
--------------------------------------------------------------------------------------------------
from collections import defaultdict, deque

def bfs(graph, start):
    visited = set()  # 방문한 노드를 저장할 집합
    queue = deque([start])  # 큐에 시작 노드를 넣습니다.

    while queue:

        # 큐에서 노드를 하나 꺼냅니다.
        node = queue.popleft()

        # 만약 그 노드가 방문하지 않은것이면
        if node not in visited:
            # 방문한 노드를 집합에 추가합니다.
            visited.add(node) 

            # 현재 노드의 인접한 노드들에 대해
            for neighbor in graph[node]:

           # 방문하지 않은 경우에만 큐에 넣습니다.  
                if neighbor not in visited: 
                    queue.append(neighbor)

#이후  while문 내에서  queue가 popleft에 의해 빌때까지, 반복됨


# 그래프를 인접 리스트로 표현. 리스트를 값으로 가지는 딕셔너리를 생성

# defaultdict(list) 의 동작은 존재하지 않는 키에 접근하면 기본값을 생성하고,

# 기존에 존재하는 키에 대해서는 기본값이 생성되지 않는다.
graph = defaultdict(list)
graph[1] = [2, 3]
graph[2] = [1, 4, 5]
graph[3] = [1, 6]
graph[4] = [2]
graph[5] = [2, 7]
graph[6] = [3]
graph[7] = [5]

# 시작 노드를 1로 설정하고 BFS를 실행합니다.
start_node = 1
print("BFS 탐색 결과:")
bfs(graph, start_node)
--------------------------------------------------------------------------------------------------
이 코드는 다음과 같은 그래프를 탐색하는 BFS 알고리즘입니다.
    1
 /     \ 
2      3   
|  \      \
4  5      6 
     |
    7

자세한 동작
1. 시작 노드 1을 먼저 방문합니다. 그 다음으로, 1과 인접한 노드들인 2와 3이 큐에 순서대로 추가됩니다.
2. 큐에서 노드를 하나씩 꺼내면서 인접한 노드들을 방문합니다. 먼저 2를 꺼내서 방문하고, 2와 인접한 노드들인 1(이미 방문했으므로 제외), 4, 5가 큐에 추가됩니다.
3. 이어서 3을 꺼내서 방문하고, 3과 인접한 노드들인 1(이미 방문했으므로 제외), 6이 큐에 추가됩니다.
4. 그 다음으로 큐에서 4를 꺼내서 방문하고, 4와 인접한 노드는 없으므로 큐에 추가할 노드가 없습니다.
5. 5를 꺼내서 방문하고, 5와 인접한 노드는 2, 7입니다. 그러나 2는 이미 방문한 노드이므로 큐에 추가하지 않고, 7만 큐에 추가됩니다.
6. 마지막으로 6을 꺼내서 방문하고, 6과 인접한 노드인 3(이미 방문했으므로 제외)만 큐에 추가됩니다.
7. 큐에는 더 이상 내용이(방문할 노드) 없으므로 BFS 탐색을 종료합니다.

출력결과
BFS 탐색 결과:
1 2 3 4 5 6 7

이 코드에서는 defaultdict와 deque를 사용하여 간단한 그래프를 표현하고 BFS 알고리즘을 구현하였습니다.

BFS 알고리즘을 적용할 여러 예시
1. **최단 경로 찾기**: 그래프나 트리에서 두 노드 사이의 최단 경로를 찾는 문제에 BFS를 사용할 수 있습니다. 출발 노드로부터 레벨별로 탐색하면서 도착 노드를 찾으면, 그 경로가 최단 경로입니다.

2. **네트워크 탐색**: 컴퓨터 네트워크나 소셜 네트워크에서 특정 사용자와 연결된 모든 사용자를 찾는 문제에 BFS를 사용할 수 있습니다. 특정 사용자로부터 시작하여 친구, 친구의 친구, 그리고 그 이상으로 네트워크를 탐색할 수 있습니다.

3. **미로 찾기**: 미로를 해결하는 문제에 BFS를 사용할 수 있습니다. 시작 지점에서부터 모든 가능한 경로를 탐색하여 도착 지점까지의 최단 경로를 찾을 수 있습니다.

4. **트리의 너비 계산**: 트리에서 각 레벨의 노드 수를 계산하는 문제에 BFS를 사용할 수 있습니다. 루트 노드부터 시작하여 각 레벨의 노드를 탐색하면서 레벨마다 카운트를 증가시킬 수 있습니다.

5. **게임 탐색**: 보드 게임이나 퍼즐 게임에서 가능한 모든 상태를 탐색하는 문제에 BFS를 사용할 수 있습니다. 현재 상태에서 가능한 모든 다음 상태를 찾아내고, 그 상태에서 가능한 모든 다음 상태를 탐색하며 게임을 진행합니다.

6. **컴퓨터 네트워크 라우팅**: 네트워크에서 최적의 경로를 찾는 문제에 BFS를 사용할 수 있습니다. 시작 노드에서부터 모든 경로를 탐색하고, 목적지에 도달할 때까지 가장 짧은 경로를 선택합니다.

7. **최단 경로 게임**: 게임에서 출발점에서 도착점까지 최단 경로로 이동하는 문제에 BFS를 사용할 수 있습니다. 가능한 모든 이동 경로를 탐색하여 최단 경로를 찾을 수 있습니다.

이러한 예시들은 BFS 알고리즘이 다양한 문제에 적용될 수 있음을 보여줍니다. BFS는 탐색하는 과정에서 레벨별로 탐색하기 때문에 최단 경로를 찾거나 네트워크를 탐색하는 등 다양한 문제에 유용하게 사용될 수 있습니다.

'코딩테스트' 카테고리의 다른 글

프로그래머스 오답노트 모음  (0) 2023.12.15

간혹 프로젝트 생성 후 마이그레이션 폴더 자체가 없을수 있다.


1. 장고 쉘 접속 후

2. ls 입력 후 manage.py 파일 있는지 확인
3. ./manage.py makemigrations project(or project_api)
4. 이제 마이그레이션 폴더 생성돼서, 정상동작할거다.

 

AI 앱 workflow

  1. 데이터 수집(데이터엔지니어)
  2. 모델개발(머신러닝 엔지니어)
  3. 모델운영(백엔드 엔지니어)
  4. 어플/서비스

데이터 수집

  • 테크기업
    데이터 : 유저(로그, 이미지, 글)
    ex) 페이스북, 아마존, 애플, 넷플릭스, 구글
    데이터가 쌓이는 속도가 비교적 빠름

  • 비테크기업
    데이터 : 센서, 제품사진
    ex) 삼성, 현대, lg, sk

모델 개발과정

  1. 문제정의 : 어떤 질문에 어떤 답을 하는 챗봇을 만들까?
    평가지표설정 : 챗봇의 대답을 잘했는지 못했는지 어떻게 판단?

  2. 데이터수집 : 머신러닝, 데이터엔지니어의 소통 증가.
  3. ML 모델링
  4. 평가

결과 도출 시간

  • 오래걸리는경우 : 크랙테스트의 경우, 사람이 직접확인하며 라벨링을 해야해서 시간이 걸림
  • 문제가 있을경우 재학습까지 진행하는게 MLOPS의 역할

ML 옵스의 단계

-머신러닝 프로젝트 중 9%만이 실제 서비스에 적용된다.

  • lev 0 : 모델을 배포할수 있다.

  • lev1 : 파이프라인을 실행할수 있어, 정확도가 떨어지면 모델 재학습, 재배포 하는등 변동성에 대응(CT)

  • lev2 : 신규/  개선 IDEA적용 (사람의개입+CT) cicd

 

ML 모델을 사용할때 문제점들

 

  1. 개발
  2. 많은 사람 들이 모델을 사용할때 부하분산등 문제가 생김

  3. ml 옵스 투입으로 문제해결
  4. 데이터 사이언티스트의 기능 추가와, ml 옵스의 시스템의 충돌

 

udf실습

'하둡,spark' 카테고리의 다른 글

UDF(User Defined Function) 사용해보기.  (1) 2024.01.25
spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

UDF (User Defined Function):

  1. 개요:**
       - **UDF (User Defined Function):** 사용자가 직접 정의한 함수로, Spark에서는 DataFrame이나 SQL에서 사용할 수 있는 사용자 정의 함수를 의미합니다.
       - 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적, spark SQL에서도 사용 가능함
       - UDF를 사용하면 개발자는 Spark의 기본 제공 함수 외에도 자신만의 사용자 정의 함수를 생성하여 데이터 처리 작업을 수행할 수 있습니다.

  2. 종류:**
       - **Scalar 함수:**
         - 개별 데이터 레코드에 적용되는 함수로, 각 레코드에 대해 개별적으로 값을 반환합니다.
         - 예시: `UPPER`, `LOWER`와 같은 문자열 처리 함수 등이 Scalar 함수에 해당합니다.
       - **Aggregation 함수 (UDAF - User Defined Aggregation Function):**
         - 여러 레코드에 대한 집계 결과를 반환하는 함수입니다.
         - 예시: `SUM`, `MIN`, `MAX`와 같은 집계 함수 등이 Aggregation 함수에 해당합니다.

  3. 사용 방법:**
       - **DataFrame API:**
         - Scala, Python, Java 등의 언어를 사용하여 UDF를 정의하고, DataFrame의 `withColumn` 또는 `select` 등의 함수를 통해 적용할 수 있습니다.
       - **SQL:**
         - SQL에서는 `CREATE FUNCTION`을 사용하여 UDF를 정의하고, 이를 SQL 문에서 활용할 수 있습니다.
         - `REGISTER FUNCTION` 등을 사용하여 Spark에서 사용자 정의 함수를 등록할 수도 있습니다.

  4. 예시:**
       - **Scala에서의 DataFrame UDF:**
         import org.apache.spark.sql.functions.udf

         val myUDF = udf((value: String) => {
           // 사용자 정의 로직 적용
           // ...
           // 결과 반환
         })

         val resultDF = inputDF.withColumn("newColumn", myUDF(col("columnName")))

       - **SQL에서의 UDF 등록 및 사용:**
         -- UDF 등록
         CREATE OR REPLACE FUNCTION my_udf AS
         'com.example.MyUDF' USING JAR 'path/to/jar/file';

         -- SQL에서 UDF 사용
         SELECT my_udf(columnName) AS newColumn FROM tableName;

  5. 그외 간단한 경우 함수 구현 방법
    1) 파이썬 람다 함수:
       - 간단한 함수를 한 줄로 작성할 때 주로 사용됩니다.
       - 예시:
         add = lambda x, y: x + y
         result = add(3, 5)
         print(result)  # 출력: 8

    2) 파이썬 일반 함수:**
       - 더 복잡한 로직이 필요할 때 사용됩니다.
       - 예시:
         def add(x, y):
             return x + y

         result = add(3, 5)
         print(result)  # 출력: 8

UDF - DataFrame에 사용1

import pyspark.sql.functions as F
from pyspark.sql.types import *

# 사용자 정의 함수(Upper UDF) 정의
upperUDF = F.udf(lambda z: z.upper())

# DataFrame에 새로운 컬럼 추가 및 UDF 적용
df.withColumn("Curated Name", upperUDF("Name"))

세부 설명:

1. `import pyspark.sql.functions as F`: PySpark의 함수들을 사용하기 위해 필요한 모듈을 임포트합니다. `F`는 편의상 별칭(alias)으로 사용되었습니다.

2. `from pyspark.sql.types import *`: PySpark의 데이터 타입들을 사용하기 위해 모듈을 임포트합니다. 여기서는 모든 데이터 타입을 가져옵니다.

3. `upperUDF = F.udf(lambda z: z.upper())`: PySpark의 `udf` 함수를 사용하여 사용자 정의 함수(Upper UDF)를 정의합니다. 이 UDF는 주어진 문자열을 대문자로 변환합니다.

4. `df.withColumn("Curated Name", upperUDF("Name"))`: DataFrame에 새로운 컬럼("Curated Name")을 추가하고, 이 컬럼에 `upperUDF`를 적용하여 "Name" 컬럼의 값들을 대문자로 변환합니다.

 

UDF - SQL에 사용1

def upper(s):
    return s.upper()

# 사용자 정의 함수(Upper UDF) 등록
upperUDF = spark.udf.register("upper", upper)

# Spark SQL을 사용하여 UDF 테스트
spark.sql("SELECT upper('aBcD')").show()

# DataFrame을 테이블로 등록
df.createOrReplaceTempView("test")

# DataFrame 기반 SQL에서 UDF 적용
result_df = spark.sql("""SELECT name, upper(name) AS 
"Curated Name" FROM test""")
result_df.show()

 

UDF - DataFram에 사용2

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# SparkSession 생성
spark = SparkSession.builder.appName("example").getOrCreate()

# 데이터 생성
data = [
    {"a": 1, "b": 2},
    {"a": 5, "b": 5}
]

# DataFrame 생성
df = spark.createDataFrame(data)

# 사용자 정의 함수(Upper UDF) 정의
sumUDF = F.udf(lambda x, y: x + y, IntegerType())

# DataFrame에 새로운 컬럼("c") 추가 및 UDF 적용
result_df = df.withColumn("c", sumUDF("a", "b"))

# 결과 출력
result_df.show()

 

UDF - SQL에 사용 2

def plus(x, y):
    return x + y

# 사용자 정의 함수(Plus UDF) 등록(register)
plusUDF = spark.udf.register("plus", plus)

# Spark SQL을 사용하여 UDF 테스트
spark.sql("SELECT plus(1, 2)").show()

# DataFrame을 테이블로 등록
df.createOrReplaceTempView("test")

# DataFrame 기반 SQL에서 UDF 적용
result_df = spark.sql("SELECT a, b, plus(a, b) AS c FROM test")
result_df.show()

 

UDF - Pandas UDF Scalar 함수 사용

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Pandas UDF 정의
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
    return s.str.upper()

# Pandas UDF를 Spark SQL UDF로 등록
upperUDF = spark.udf.register("upper_udf", upper_udf2)

# DataFrame에서 Pandas UDF 적용하여 결과 출력
df.select("Name", upperUDF("Name")).show()

# Spark SQL에서 Pandas UDF 적용하여 결과 출력
spark.sql("""SELECT Name, upper_udf(Name) AS `Curated Name` 
FROM test""").show()

 

UDF - DataFrame/SQL에 Aggregation 사용

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Pandas UDF 정의
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
    return v.mean()

# Pandas UDF를 Spark SQL UDF로 등록
averageUDF = spark.udf.register('average', average)

# Spark SQL에서 Pandas UDF 적용하여 결과 출력
spark.sql('SELECT average(b) FROM test').show()

# DataFrame에서 Pandas UDF 적용하여 결과 출력
df.agg(averageUDF("b").alias("average")).show()

 

'하둡,spark' 카테고리의 다른 글

UDF 실습  (0) 2024.01.28
spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

JOIN (조인) 개요:

  1. 정의:
       - SQL 조인은 두 개 이상의 테이블을 공통 필드를 기반으로 결합하는 작업입니다.
       - 이를 통해 서로 다른 테이블에 분산되어 있는 정보를 통합하고, 복잡한 쿼리를 수행할 수 있습니다.

  2. 스타 스키마와의 관련:**
       - 스타 스키마(Star Schema)는 데이터 웨어하우스에서 사용되는 스키마 중 하나로, 중앙의 대형 테이블(사실상 팩트 테이블)이 여러 작은 차원 테이블들과 조인하여 사용됩니다.

  3. LEFT와 RIGHT 테이블:**
       - 조인에서 왼쪽 테이블을 **LEFT**라고 하고, 오른쪽 테이블을 **RIGHT**라고 합니다.

  4. 조인 결과:**
       - JOIN의 결과는 양쪽의 필드를 모두 가진 새로운 테이블을 생성합니다.
       - 조인의 방식에 따라 선택되는 레코드와 채워지는 필드가 달라집니다.

  5. 조인 방식에 따른 차이:**
       - **어떤 레코드들이 선택되는지?**
         - INNER JOIN: 양쪽 테이블에서 매칭되는 레코드들만 선택됩니다.
         - LEFT JOIN (또는 LEFT OUTER JOIN): 왼쪽 테이블의 모든 레코드가 선택되고, 오른쪽 테이블과 매칭되는 레코드가 있으면 함께 선택됩니다.
         - RIGHT JOIN (또는 RIGHT OUTER JOIN): 오른쪽 테이블의 모든 레코드가 선택되고, 왼쪽 테이블과 매칭되는 레코드가 있으면 함께 선택됩니다.
         - FULL JOIN (또는 FULL OUTER JOIN): 양쪽 테이블의 모든 레코드가 선택되며, 매칭되는 경우 함께 선택됩니다.
       
        - **어떤 필드들이 채워지는지?**
         - INNER JOIN: 매칭되는 필드만 선택됩니다.
         - LEFT JOIN: 왼쪽 테이블의 필드는 항상 선택되고, 오른쪽 테이블과 매칭되는 경우 함께 선택됩니다.
         - RIGHT JOIN: 오른쪽 테이블의 필드는 항상 선택되고, 왼쪽 테이블과 매칭되는 경우 함께 선택됩니다.
         - FULL JOIN: 양쪽 테이블의 필드가 함께 선택되며, 매칭되는 경우 함께 선택됩니다.

 

 

JOIN 실습 - 예제 데이터 준비

  • 신체 데이터 이용 감시 시스템
  • 경고(alert)조건 : 지병이2개이상, 5k이상 살이쪘을때, 2일이상 몸무게 재지 않았을때.

  • INNER JOIN
    1. 양쪽 테이블에서 매치가 되는 레코드들만 리턴함
    2. 양쪽 테이블의 필드가 모두 채워진 상태로 리턴됨
    SELECT *
    FROM Vital v
    JOIN Alert a ON v.vitalID = a.vitalID;

  • LEFT JOIN
    1. 왼쪽 테이블(Base)의 모든 레코드들을 리턴함
    2. 오른쪽 테이블의 필드는 왼쪽 레코드와 매칭되는 경우에만 채워진 상태로 리턴됨
    SELECT *
    FROM raw_data.Vital v
    LEFT JOIN raw_data.Alert a ON v.vitalID = a.vitalID;

  • FULL JOIN
    1. 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들을 리턴함
    2. 매칭되는 경우에만 양쪽 테이블들의 모든 필드들이 채워진 상태로 리턴됨
    SELECT *
    FROM raw_data.Vital v
    FULL JOIN raw_data.Alert a ON v.vitalID = a.vitalID;

 

  • CROSS JOIN
    1. 왼쪽 테이블과 오른쪽 테이블의 모든 레코드들의 조합을 리턴함
    SELECT *
    FROM raw_data.Vital v
    CROSS JOIN raw_data.Alert a;

  • SELF JOIN
    1. 동일한 테이블을 alias를 달리해서 자기 자신과 조인함
    SELECT *
    FROM raw_data.Vital v1
    JOIN raw_data.Vital v2 ON v1.vitalID = v2.vitalID;

 

최적화 관점에서 본 조인의 종류들

  1. Shuffle JOIN:**
       - **일반 조인 방식:**
         - 특정 키를 기반으로 두 개 이상의 데이터셋을 결합하는 일반적인 조인 방식입니다.
       - **Bucket JOIN:**
         - 조인 키를 기반으로 조인을 수행하기 전에 두 데이터셋을 동일한 파티션으로 재분배하는 방식입니다.
         - 조인 키를 사용하여 새로운 파티션을 생성하고, 각 파티션에 속한 데이터를 조인합니다.
         - 이는 데이터를 효율적으로 분산하여 처리하기 위한 방법 중 하나입니다.

  2. Broadcast JOIN:**
       - **개요:**
         - Broadcast JOIN은 큰 데이터셋과 작은 데이터셋 간의 조인을 최적화하는 방식입니다.
         - 작은 데이터셋을 전체 클러스터에 복제(broadcasting)하여 특정 키를 기반으로 큰 데이터셋과 조인합니다.
       - **적용 조건:**
         - 데이터 프레임이 충분히 작은 경우에만 Broadcast JOIN을 사용하는 것이 효과적입니다.
         - 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌림
         - `spark.sql.autoBroadcastJoinThreshold` 파라미터를 통해 어떤 크기 이하의 데이터프레임을 브로드캐스트할 것인지 결정할 수 있습니다. (exercutor에 지정된 메모리 값보다 작게)
       - **장점:**
         - 데이터를 모든 노드로 브로드캐스트하므로 특정 노드에서 조인을 수행할 수 있어 효율적입니다.
         - 네트워크 비용이 감소하고 성능이 향상됩니다.

    이러한 최적화 기법은 Spark에서 대용량 데이터셋을 다룰 때 성능을 향상시키기 위해 사용됩니다. 적절한 조인 전략을 선택하고 데이터셋을 파티셔닝하는 것은 Spark 작업의 성능과 확장성에 큰 영향을 미칩니다.

 

join 을 그림으로 이해

 

Broadcast JOIN을 그림으로 이해

'하둡,spark' 카테고리의 다른 글

UDF 실습  (0) 2024.01.28
UDF(User Defined Function) 사용해보기.  (1) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24

SQL은 빅데이터 세상에서도 중요!

  • 데이터 분야에서 일하고자 하면 반드시 익혀야할 기본 기술
  • 구조화된 데이터를 다루는한 SQL은 데이터 규모와 상관없이 쓰임
  • 모든 대용량 데이터 웨어하우스는 SQL 기반
    - Redshift, Snowflake, BigQuery
    - Hive/Presto
  • Spark도 예외는 아님
    - Spark SQL이 지원됨

Spark SQL이란?

  1. 구조화된 데이터 처리:** Spark SQL은 주로 구조화된 데이터를 다루기 위해 설계되었습니다. 이는 표 형식의 데이터로써, 예를 들어 CSV 파일, Parquet 파일 등과 같은 데이터 형식을 포함합니다.

  2. SQL 처리:** Spark SQL을 사용하면 SQL 쿼리를 통해 데이터프레임 작업을 수행할 수 있습니다. 데이터프레임에 테이블 이름을 할당한 후, SQL 쿼리를 사용하여 데이터를 검색, 변환 및 분석할 수 있습니다.
       ```python
       # Spark SQL을 사용한 데이터프레임에 SQL 쿼리 수행 예제
       df.createOrReplaceTempView("my_table")
       result = spark.sql("SELECT * FROM my_table WHERE age > 21")
       ```
  3. Pandas와 비슷한 패턴:** Spark SQL의 데이터프레임 작업은 Pandas에서 pandasql 모듈의 sqldf 함수와 유사한 패턴을 가지고 있습니다. 이는 구조화된 데이터를 SQL 스타일로 다룰 때 유용합니다.

  4. Hive 호환성:** Spark SQL은 Hive Query Language (HQL)과 호환되며, Hive Metastore를 통해 Hive 테이블을 읽고 쓸 수 있습니다. 이는 기존의 Hive 기반 시스템과의 통합을 가능케 합니다.
       ```python
       # Hive 테이블을 읽어오는 예제
       df = spark.sql("SELECT * FROM hive_table")
       ```
  5. 분산 쿼리 및 처리:** Spark SQL은 Apache Spark의 일부로서 분산 환경에서 동작하며, 대규모 데이터를 효과적으로 처리할 수 있습니다.

Spark SQL vs. DataFrame

  • 하지만 SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음
    두 개를 동시에 사용할 수 있다는 점 분명히 기억

    1. Familiarity/Readability
    ● SQL이 가독성이 더 좋고 더 많은 사람들이 사용가능

    2. Optimization
    ● Spark SQL 엔진이 최적화하기 더 좋음 (SQL은 Declarative)
    Catalyst Optimizer : 쿼리 실행 엔진을 최적화
    Project Tungsten : 높은 성능과 메모리 효율성을 제공

    3. Interoperability/Data Management
    ● SQL이 포팅(특정 데이터베이스 제품에 종속되지 않고 이식성이 뛰어남)도 쉽고 접근권한 체크도 쉬움

Spark SQL 사용법 - SQL 활용

  1. Spark SQL을 사용하여 SQL을 수행하는 방법은 다음과 같습니다. 아래 예제는 데이터프레임을 기반으로 테이블 뷰를 생성하고, SQL 쿼리를 사용하여 결과를 데이터프레임으로 받아오는 과정을 보여줍니다.
  2. SparkSession 생성:
       ```python
       from pyspark.sql import SparkSession

       # SparkSession 생성
       spark = SparkSession.builder.appName("example").getOrCreate()
       ```
  3. 데이터프레임 생성:
       ```python
       # 데이터프레임 생성 (예시 데이터)
       data = [("John", "Male"), ("Alice", "Female"), ("Bob", "Male"), ("Eve", "Female")]
       columns = ["name", "gender"]
       namegender_df = spark.createDataFrame(data, columns)
       ```
  4. 테이블 뷰 생성:
       - `createOrReplaceTempView("namegender")`: 데이터프레임을 테이블 뷰로 등록합니다. 세션 동안 존재하며, 동일한 이름의 테이블 뷰가 이미 존재하는 경우 덮어씁니다.
       ```python
       namegender_df.createOrReplaceTempView("namegender")
       ```
  5. SQL 쿼리 수행:
       - `spark.sql("SELECT gender, count(1) FROM namegender GROUP BY gender")`: SQL 쿼리를 실행하여 결과를 데이터프레임으로 받아옵니다. 결과 데이터프레임은 `namegender_group_df`에 저장됩니다.
       ```python
       namegender_group_df = spark.sql("""
           SELECT gender, count(1) FROM namegender GROUP BY gender
       """)
       ```
  6. 결과 출력:
       ```python
       # 결과 출력
       print(namegender_group_df.collect())
       ```
    이를 통해 Spark SQL을 사용하여 데이터프레임을 통한 SQL 쿼리를 실행하고 결과를 다룰 수 있습니다. 주의: 실제 환경에서는 데이터 소스를 로드하거나 외부 데이터에 쿼리를 실행하는 등의 더 복잡한 작업이 수행될 수 있습니다.

 

sparkSession 사용 외부 데이터베이스 연결

  • Spark Session의 read 함수를 호출(로그인 관련 정보와 읽어오고자 하는 테이블 혹은 SQL을 지정)
# df_user_session_channel 데이터프레임을 생성하기 위해 SparkSession을 이용합니다.
df_user_session_channel = spark.read \

 # JDBC 데이터 소스를 사용하도록 지정합니다.
    .format("jdbc") \ 

# 사용할 JDBC 드라이버를 지정합니다. 여기서는 Amazon Redshift용 드라이버를 사용합니다.
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \  

# 데이터베이스 연결 정보를 지정합니다. HOST, PORT, DB, ID, PASSWORD는 실제 값으로 대체되어야 합니다.
    .option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \  

# 읽어올 테이블의 이름을 지정합니다.
    .option("dbtable", "raw_data.user_session_channel") \  

# 데이터를 읽어와서 데이터프레임을 생성합니다.
    .load() 

 

'하둡,spark' 카테고리의 다른 글

UDF(User Defined Function) 사용해보기.  (1) 2024.01.25
spark sql에서 join  (0) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24
spark 데이터프레임 실습3  (0) 2024.01.24
실습5

'하둡,spark' 카테고리의 다른 글

spark sql에서 join  (0) 2024.01.25
spark SQL  (1) 2024.01.25
spark 데이터프레임 실습4  (0) 2024.01.24
spark 데이터프레임 실습3  (0) 2024.01.24
spark 데이터프레임 실습2  (0) 2024.01.23
Untitled14

'하둡,spark' 카테고리의 다른 글

spark SQL  (1) 2024.01.25
spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습3  (0) 2024.01.24
spark 데이터프레임 실습2  (0) 2024.01.23
spark 데이터프레임 실습1  (0) 2024.01.23

 

 

'하둡,spark' 카테고리의 다른 글

spark 데이터프레임 실습5  (0) 2024.01.24
spark 데이터프레임 실습4  (0) 2024.01.24
spark 데이터프레임 실습2  (0) 2024.01.23
spark 데이터프레임 실습1  (0) 2024.01.23
spark 기초 실습(colab)  (0) 2024.01.19

+ Recent posts