PySpark 설치
실습 내용
- 헤더가 없는 CSV 파일 처리하기
- 데이터에 스키마 지정하기
- SparkConf 사용해보기
- measure_type값이 TMIN인 레코드 대상으로 stationId별 최소 온도 찾기
PySpark 설치
In [1]:
!pip install pyspark==3.3.1 py4j==0.10.9.5
Collecting pyspark==3.3.1 Downloading pyspark-3.3.1.tar.gz (281.4 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 281.4/281.4 MB 3.8 MB/s eta 0:00:00 Preparing metadata (setup.py) ... done Collecting py4j==0.10.9.5 Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 199.7/199.7 kB 14.7 MB/s eta 0:00:00 Building wheels for collected packages: pyspark Building wheel for pyspark (setup.py) ... done Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845494 sha256=5a8a95a4b64eb993cbb5caab35f11bf06f0a79a54ae99e16d71578465d2ec74f Stored in directory: /root/.cache/pip/wheels/0f/f0/3d/517368b8ce80486e84f89f214e0a022554e4ee64969f46279b Successfully built pyspark Installing collected packages: py4j, pyspark Attempting uninstall: py4j Found existing installation: py4j 0.10.9.7 Uninstalling py4j-0.10.9.7: Successfully uninstalled py4j-0.10.9.7 Successfully installed py4j-0.10.9.5 pyspark-3.3.1
처리할 데이터 파일을 먼저 다운로드 받아온다
In [2]:
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv
--2024-01-22 13:52:02-- https://s3-geospatial.s3-us-west-2.amazonaws.com/1800.csv Resolving s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)... 3.5.77.102, 52.218.180.65, 52.92.161.162, ... Connecting to s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)|3.5.77.102|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 62728 (61K) [text/csv] Saving to: ‘1800.csv’ 1800.csv 100%[===================>] 61.26K --.-KB/s in 0.1s 2024-01-22 13:52:03 (442 KB/s) - ‘1800.csv’ saved [62728/62728]
In [3]:
!ls -tl
total 68 drwxr-xr-x 1 root root 4096 Jan 18 14:21 sample_data -rw-r--r-- 1 root root 62728 Apr 10 2022 1800.csv
In [4]:
!head -5 1800.csv
ITE00100554,18000101,TMAX,-75,,,E, ITE00100554,18000101,TMIN,-148,,,E, GM000010962,18000101,PRCP,0,,,E, EZE00100082,18000101,TMAX,-86,,,E, EZE00100082,18000101,TMIN,-135,,,E,
In [5]:
import pandas as pd
pd_df = pd.read_csv(
"1800.csv",
names=["stationID", "date", "measure_type", "temperature"],
usecols=[0, 1, 2, 3]
)
In [6]:
pd_df.head()
Out[6]:
stationID | date | measure_type | temperature | |
---|---|---|---|---|
0 | ITE00100554 | 18000101 | TMAX | -75 |
1 | ITE00100554 | 18000101 | TMIN | -148 |
2 | GM000010962 | 18000101 | PRCP | 0 |
3 | EZE00100082 | 18000101 | TMAX | -86 |
4 | EZE00100082 | 18000101 | TMIN | -135 |
In [9]:
# 'measure_type' 열이 "TMIN"인 행들을 필터하여 새로운 DataFrame을 만듭니다.
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]
In [10]:
pd_minTemps.head()
Out[10]:
stationID | date | measure_type | temperature | |
---|---|---|---|---|
1 | ITE00100554 | 18000101 | TMIN | -148 |
4 | EZE00100082 | 18000101 | TMIN | -135 |
6 | ITE00100554 | 18000102 | TMIN | -125 |
9 | EZE00100082 | 18000102 | TMIN | -130 |
11 | ITE00100554 | 18000103 | TMIN | -46 |
In [11]:
# Select only stationID and temperature
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]
In [12]:
pd_minTemps.head()
Out[12]:
stationID | date | measure_type | temperature | |
---|---|---|---|---|
1 | ITE00100554 | 18000101 | TMIN | -148 |
4 | EZE00100082 | 18000101 | TMIN | -135 |
6 | ITE00100554 | 18000102 | TMIN | -125 |
9 | EZE00100082 | 18000102 | TMIN | -130 |
11 | ITE00100554 | 18000103 | TMIN | -46 |
In [13]:
# Select only stationID and temperature
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]
In [14]:
# Aggregate to find minimum temperature for every station
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
pd_minTempsByStation.head()
Out[14]:
temperature | |
---|---|
stationID | |
EZE00100082 | -135 |
ITE00100554 | -148 |
Spark으로 처리해본다
In [19]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
# Spark 애플리케이션을 구성하기 위해 SparkConf 객체를 생성합니다.
conf = SparkConf()
# 애플리케이션 이름을 설정합니다.
conf.set("spark.app.name", "PySpark DataFrame #1")
# 모든 가능한 코어를 사용하여 로컬에서 Spark를 실행하기 위해 마스터 URL을 설정합니다.
conf.set("spark.master", "local[*]")
# 지정된 구성으로 SparkSession을 생성합니다.
# config(conf=conf): SparkConf 객체에서 지정된 구성을 설정합니다.
# getOrCreate(): 기존의 SparkSession을 가져오거나 없으면 새로 생성합니다.
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
In [16]:
df = spark.read.format("csv").load("1800.csv") # spark.read.csv("1800.csv")
In [17]:
# DataFrame의 스키마를 출력
df.printSchema()
root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true) |-- _c4: string (nullable = true) |-- _c5: string (nullable = true) |-- _c6: string (nullable = true) |-- _c7: string (nullable = true)
In [18]:
# PySpark를 사용하여 CSV 파일을 읽어와 DataFrame으로 변환하는 작업
df = spark.read.format("csv")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
In [20]:
df.printSchema()
root |-- stationID: string (nullable = true) |-- date: string (nullable = true) |-- measure_type: string (nullable = true) |-- temperature: string (nullable = true) |-- _c4: string (nullable = true) |-- _c5: string (nullable = true) |-- _c6: string (nullable = true) |-- _c7: string (nullable = true)
In [21]:
df = spark.read.format("csv")\
.option("inferSchema", "true")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
In [22]:
df.printSchema()
root |-- stationID: string (nullable = true) |-- date: integer (nullable = true) |-- measure_type: string (nullable = true) |-- temperature: integer (nullable = true) |-- _c4: string (nullable = true) |-- _c5: string (nullable = true) |-- _c6: string (nullable = true) |-- _c7: string (nullable = true)
In [23]:
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql.types import StructType, StructField
# 스키마 정의
schema = StructType([ \
StructField("stationID", StringType(), True), \
StructField("date", IntegerType(), True), \
StructField("measure_type", StringType(), True), \
StructField("temperature", FloatType(), True)])
In [25]:
# PySpark에서 CSV 파일을 읽어올 때 미리 정의한 스키마를 사용하여 DataFrame을 생성하려는 시도
# df = spark.read.schema(schema).format("csv").load("1800.csv")
df = spark.read.schema(schema).csv("1800.csv")
In [26]:
df.printSchema()
root |-- stationID: string (nullable = true) |-- date: integer (nullable = true) |-- measure_type: string (nullable = true) |-- temperature: float (nullable = true)
In [28]:
# 필터 방법 1: TMIN으로 필터
minTemps = df.filter(df.measure_type == "TMIN")
In [29]:
minTemps.count()
Out[29]:
730
In [30]:
# 필터 방법 2: Column expression으로 필터링 적용
minTemps = df.where(df.measure_type == "TMIN")
In [31]:
minTemps.count()
Out[31]:
730
In [33]:
# 각 기상 관측소에서 최소 온도를 찾기 위해 groupBy와 min 함수를 사용합니다.
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
# 결과를 출력합니다.
minTempsByStation.show()
+-----------+----------------+ | stationID|min(temperature)| +-----------+----------------+ |ITE00100554| -148.0| |EZE00100082| -135.0| +-----------+----------------+
In [35]:
# "stationID"와 "temperature" 열만 선택합니다.
stationTemps = minTemps[["stationID", "temperature"]]
In [36]:
stationTemps.show(5)
+-----------+-----------+ | stationID|temperature| +-----------+-----------+ |ITE00100554| -148.0| |EZE00100082| -135.0| |ITE00100554| -125.0| |EZE00100082| -130.0| |ITE00100554| -46.0| +-----------+-----------+ only showing top 5 rows
In [37]:
stationTemps = minTemps.select("stationID", "temperature")
In [43]:
# 결과를 수집하여 로컬 Python 리스트로 반환합니다.
results = minTempsByStation.collect()
In [44]:
for result in results:
print(result[0] + "\t{:.2f}F".format(result[1]))
ITE00100554 -148.00F EZE00100082 -135.00F
Spark SQL로 처리해보기
In [45]:
df.createOrReplaceTempView("station1800")
In [46]:
results = spark.sql("""SELECT stationID, MIN(temperature)
FROM station1800
WHERE measure_type = 'TMIN'
GROUP BY 1""").collect()
In [47]:
# pyspark.sql.Row는 DataFrame의 레코드에 해당하며 필드별로 이름이 존재
for r in results:
print(r)
Row(stationID='ITE00100554', min(temperature)=-148.0) Row(stationID='EZE00100082', min(temperature)=-135.0)
- IntegerType: 32비트 정수 데이터 타입.
- LongType: 64비트 정수 데이터 타입.
- FloatType: 32비트 부동 소수점 데이터 타입.
- StringType: 문자열 데이터 타입.
- BooleanType: 불리언(참 또는 거짓) 데이터 타입.
- TimestampType: 타임스탬프(날짜와 시간) 데이터 타입.
- DateType: 날짜 데이터 타입.
- ArrayType: 배열 데이터 타입.
- StructType: 구조체(레코드) 데이터 타입.
- StructField: 구조체의 각 필드를 정의하는 데 사용되는 클래스.
- MapType: 맵(키-값 쌍) 데이터 타입.
In [ ]:
# DataFrame의 컬럼을 지칭하는 4가지 방식
from pyspark.sql.functions import col, column
stationTemps = minTemps.select(
"stationID",
col("stationID"),
column("stationID"),
minTemps.stationID
)
'하둡,spark' 카테고리의 다른 글
spark 데이터프레임 실습3 (0) | 2024.01.24 |
---|---|
spark 데이터프레임 실습2 (0) | 2024.01.23 |
spark 기초 실습(colab) (0) | 2024.01.19 |
spark 데이터 구조 (0) | 2024.01.19 |
spark 데이터 처리 (0) | 2024.01.18 |