실습2 - 헤더 없는 CSV 파일처리하기 입력 데이터: 헤더 없는 CSV 파일
- 데이터에 스키마 지정하기(cust_id, item_id, amount_spent를 데이터 컬럼으로 추가)
- cust_id를 기준으로 amount_spent의 합을 계산
In [ ]:
!pip install pyspark==3.3.1 py4j==0.10.9.5
In [2]:
from pyspark.sql import SparkSession
# SparkSession은 PySpark에서 데이터를 읽고 처리하는 데 사용되는 진입점(entry point)입니다
# .master("local[*]"): Spark 애플리케이션을 로컬 모드로 실행하며, 가능한 모든 코어를 사용합니다.
# [*]는 사용 가능한 모든 코어를 의미합니다.
# .appName('PySpark DataFrame #2'): Spark 애플리케이션의 이름을 지정합니다.
# 이 부분은 Spark UI에서 애플리케이션을 식별하는 데 사용됩니다.
# .getOrCreate(): 이미 존재하는 SparkSession을 가져오거나 없으면 새로 생성합니다.
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark DataFrame #2')\
.getOrCreate()
In [3]:
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/customer-orders.csv
--2024-01-23 02:11:55-- https://s3-geospatial.s3-us-west-2.amazonaws.com/customer-orders.csv Resolving s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)... 52.218.237.73, 52.92.241.234, 52.92.241.114, ... Connecting to s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)|52.218.237.73|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 146855 (143K) [text/csv] Saving to: ‘customer-orders.csv’ customer-orders.csv 100%[===================>] 143.41K 320KB/s in 0.4s 2024-01-23 02:11:57 (320 KB/s) - ‘customer-orders.csv’ saved [146855/146855]
In [4]:
!ls -tl
total 148 drwxr-xr-x 1 root root 4096 Jan 19 14:20 sample_data -rw-r--r-- 1 root root 146855 Apr 10 2022 customer-orders.csv
In [5]:
!head -5 customer-orders.csv
44,8602,37.19 35,5368,65.89 2,3391,40.64 47,6694,14.98 29,680,13.08
In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([ \
StructField("cust_id", StringType(), True), \
StructField("item_id", StringType(), True), \
StructField("amount_spent", FloatType(), True)])
In [7]:
# 미리 정의한 스키마(schema)가 있다고 가정합니다.
# schema는 stationID, date, measure_type, temperature로 구성되어 있다고 가정합니다.
df = spark.read.schema(schema).csv("customer-orders.csv")
# DataFrame의 스키마를 출력합니다.
df.printSchema()
root |-- cust_id: string (nullable = true) |-- item_id: string (nullable = true) |-- amount_spent: float (nullable = true)
In [8]:
# "cust_id"를 기준으로 그룹화하고 "amount_spent" 열에 대해 합계를 계산하는 작업
df_ca = df.groupBy("cust_id").sum("amount_spent")
In [9]:
df_ca.show()
+-------+------------------+ |cust_id| sum(amount_spent)| +-------+------------------+ | 51| 4975.219970226288| | 7| 4755.070008277893| | 15| 5413.510010659695| | 54| 6065.390002984554| | 11| 5152.289969373494| | 29|5032.5300433933735| | 69| 5123.010002791882| | 42| 5696.840004444122| | 73| 6206.199985742569| | 87| 5206.400022745132| | 64| 5288.690012812614| | 3| 4659.629958629608| | 30| 4990.720004022121| | 34|5330.8000039458275| | 59| 5642.890004396439| | 8|5517.2399980425835| | 22| 5019.449993014336| | 28| 5000.71000123024| | 85| 5503.42998456955| | 35| 5155.41999566555| +-------+------------------+ only showing top 20 rows
In [10]:
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")
In [11]:
df_ca.show(10)
+-------+------------------+ |cust_id| sum| +-------+------------------+ | 51| 4975.219970226288| | 7| 4755.070008277893| | 15| 5413.510010659695| | 54| 6065.390002984554| | 11| 5152.289969373494| | 29|5032.5300433933735| | 69| 5123.010002791882| | 42| 5696.840004444122| | 73| 6206.199985742569| | 87| 5206.400022745132| +-------+------------------+ only showing top 10 rows
In [12]:
import pyspark.sql.functions as f
# PySpark DataFrame에서 "cust_id"를 기준으로 그룹화하고 "amount_spent" 열에 대해 합계를
# f.sum('amount_spent')은 "amount_spent" 열에 대한 합계를 계산하는 PySpark 함수
# .alias('sum')은 계산된 합계를 나타내는 새로운 열의 이름을 "sum"으로 지정
df_ca = df.groupBy("cust_id") \
.agg(f.sum('amount_spent').alias('sum'))
In [13]:
df_ca.show(5)
+-------+-----------------+ |cust_id| sum| +-------+-----------------+ | 51|4975.219970226288| | 7|4755.070008277893| | 15|5413.510010659695| | 54|6065.390002984554| | 11|5152.289969373494| +-------+-----------------+ only showing top 5 rows
In [14]:
# agg 메서드 내부에 여러 집계 함수를 사용하여 다양한 통계치를 계산
# 열에 대한 합계를 계산하고 "sum" 열
# 열에서 최댓값을 계산하고 "max" 열
# 열에서 평균값을 계산하고 "avg" 열로 결과를 표시
df.groupBy("cust_id") \
.agg(
f.sum('amount_spent').alias('sum'),
f.max('amount_spent').alias('max'),
f.avg('amount_spent').alias('avg')).collect()
Out[14]:
[Row(cust_id='51', sum=4975.219970226288, max=97.61000061035156, avg=48.77666637476753), Row(cust_id='7', sum=4755.070008277893, max=98.5999984741211, avg=50.58585115189248), Row(cust_id='15', sum=5413.510010659695, max=99.56999969482422, avg=52.05298087172783), Row(cust_id='54', sum=6065.390002984554, max=99.2300033569336, avg=49.31211384540288), Row(cust_id='11', sum=5152.289969373494, max=99.11000061035156, avg=47.70638860531013), Row(cust_id='29', sum=5032.5300433933735, max=99.87000274658203, avg=45.75027312175794), Row(cust_id='69', sum=5123.010002791882, max=98.91999816894531, avg=51.230100027918816), Row(cust_id='42', sum=5696.840004444122, max=99.05999755859375, avg=56.968400044441225), Row(cust_id='73', sum=6206.199985742569, max=99.98999786376953, avg=52.594915133411604), Row(cust_id='87', sum=5206.400022745132, max=99.97000122070312, avg=54.2333335702618), Row(cust_id='64', sum=5288.690012812614, max=99.55999755859375, avg=49.427009465538454), Row(cust_id='3', sum=4659.629958629608, max=99.83000183105469, avg=51.20472482010558), Row(cust_id='30', sum=4990.720004022121, max=98.76000213623047, avg=47.08226418888794), Row(cust_id='34', sum=5330.8000039458275, max=99.68000030517578, avg=50.769523847103116), Row(cust_id='59', sum=5642.890004396439, max=99.66000366210938, avg=51.29900003996762), Row(cust_id='8', sum=5517.2399980425835, max=98.70999908447266, avg=49.70486484723048), Row(cust_id='22', sum=5019.449993014336, max=99.06999969482422, avg=53.39840418100357), Row(cust_id='28', sum=5000.71000123024, max=99.68000030517578, avg=51.553711352889074), Row(cust_id='85', sum=5503.42998456955, max=98.68000030517578, avg=52.41361890066238), Row(cust_id='35', sum=5155.41999566555, max=99.2699966430664, avg=46.86745450605046), Row(cust_id='16', sum=4979.059988319874, max=99.94999694824219, avg=45.679449434127285), Row(cust_id='52', sum=5245.05999673903, max=99.4000015258789, avg=53.52102037488806), Row(cust_id='0', sum=5524.950008839369, max=99.44999694824219, avg=47.22179494734503), Row(cust_id='71', sum=5995.659991919994, max=98.25, avg=55.00605497174307), Row(cust_id='98', sum=4297.259994864464, max=98.6500015258789, avg=48.832499941641636), Row(cust_id='47', sum=4316.299998342991, max=99.73999786376953, avg=50.189534864453385), Row(cust_id='99', sum=4172.290024012327, max=99.73999786376953, avg=45.350978521873124), Row(cust_id='96', sum=3924.2299877405167, max=99.83999633789062, avg=43.602555419339076), Row(cust_id='43', sum=5368.830015063286, max=99.19999694824219, avg=51.623365529454674), Row(cust_id='5', sum=4561.0700044333935, max=99.94000244140625, avg=49.04376348853111), Row(cust_id='31', sum=4765.050008416176, max=98.58000183105469, avg=47.650500084161756), Row(cust_id='18', sum=4921.2700062170625, max=99.55000305175781, avg=47.77932044870934), Row(cust_id='70', sum=5368.249986544251, max=99.83999633789062, avg=49.706018393928254), Row(cust_id='61', sum=5497.479998707771, max=99.94999694824219, avg=53.89686273242913), Row(cust_id='27', sum=4915.890009522438, max=99.62999725341797, avg=47.26817316848498), Row(cust_id='75', sum=4178.499995291233, max=97.87999725341797, avg=46.42777772545814), Row(cust_id='17', sum=5032.680001735687, max=99.70999908447266, avg=52.97557896563881), Row(cust_id='26', sum=5250.399979650974, max=97.33999633789062, avg=46.05614017237697), Row(cust_id='46', sum=5963.110011339188, max=99.22000122070312, avg=51.85313053338424), Row(cust_id='78', sum=4524.510001778603, max=98.4000015258789, avg=51.41488638384776), Row(cust_id='77', sum=4327.730022907257, max=99.94000244140625, avg=55.483718242400734), Row(cust_id='89', sum=4851.480018436909, max=97.06999969482422, avg=48.0344556280882), Row(cust_id='6', sum=5397.880012750626, max=99.6500015258789, avg=51.408381073815484), Row(cust_id='60', sum=5040.710016489029, max=99.80999755859375, avg=53.06010543672662), Row(cust_id='90', sum=5290.410029888153, max=98.36000061035156, avg=51.36320417367139), Row(cust_id='68', sum=6375.450028181076, max=99.79000091552734, avg=55.43869589722675), Row(cust_id='19', sum=5059.429994106293, max=99.61000061035156, avg=51.626836674554006), Row(cust_id='23', sum=4042.650001913309, max=98.73999786376953, avg=48.1267857370632), Row(cust_id='41', sum=5637.619991332293, max=99.5999984741211, avg=53.185094257851816), Row(cust_id='55', sum=5298.090022087097, max=97.95999908447266, avg=50.458000210353305), Row(cust_id='95', sum=4876.840013846755, max=99.88999938964844, avg=50.276701173677885), Row(cust_id='93', sum=5265.7500213086605, max=98.66999816894531, avg=49.212617021576264), Row(cust_id='40', sum=5186.430004000664, max=99.30000305175781, avg=45.49500003509354), Row(cust_id='38', sum=4898.459992894903, max=99.94000244140625, avg=49.479393867625284), Row(cust_id='25', sum=5057.6099898815155, max=99.8499984741211, avg=51.60826520287261), Row(cust_id='44', sum=4756.890008449554, max=99.87999725341797, avg=51.7053261787995), Row(cust_id='82', sum=4812.490003585815, max=99.61000061035156, avg=49.107040852916484), Row(cust_id='53', sum=4945.300026416779, max=99.51000213623047, avg=52.609574749114664), Row(cust_id='92', sum=5379.280023932457, max=99.83000183105469, avg=54.8906124891067), Row(cust_id='86', sum=4908.810004234314, max=98.97000122070312, avg=48.125588276807), Row(cust_id='58', sum=5437.730004191399, max=99.4800033569336, avg=52.2858654249173), Row(cust_id='81', sum=5112.7100045681, max=99.08000183105469, avg=50.12460788792255), Row(cust_id='33', sum=5254.659994900227, max=99.97000122070312, avg=50.52557687404064), Row(cust_id='48', sum=4384.329996109009, max=99.62999725341797, avg=51.580352895400104), Row(cust_id='97', sum=5977.190007060766, max=99.5999984741211, avg=53.367767920185415), Row(cust_id='67', sum=4505.78999453038, max=99.12000274658203, avg=47.42936836347768), Row(cust_id='84', sum=4652.939991295338, max=99.16999816894531, avg=48.468124909326434), Row(cust_id='79', sum=3790.569982469082, max=99.81999969482422, avg=46.797160277396074), Row(cust_id='9', sum=5322.64999294281, max=98.97000122070312, avg=51.676213523716605), Row(cust_id='24', sum=5259.92001748085, max=98.62999725341797, avg=51.06718463573641), Row(cust_id='32', sum=5496.049998283386, max=98.94999694824219, avg=53.359708721197926), Row(cust_id='88', sum=4830.549984455109, max=99.66999816894531, avg=47.82722756886246), Row(cust_id='1', sum=4958.599974133074, max=96.80000305175781, avg=44.672071839036704), Row(cust_id='20', sum=4836.860020637512, max=98.9800033569336, avg=51.45595766635651), Row(cust_id='56', sum=4701.020000040531, max=99.27999877929688, avg=47.01020000040531), Row(cust_id='36', sum=4278.049998521805, max=96.45999908447266, avg=51.54277106652777), Row(cust_id='10', sum=4819.699996152893, max=99.43000030517578, avg=49.68762882631848), Row(cust_id='37', sum=4735.199992477894, max=99.83999633789062, avg=45.97281546095043), Row(cust_id='49', sum=4394.599998474121, max=99.8499984741211, avg=45.77708331743876), Row(cust_id='63', sum=5415.150004655123, max=99.44000244140625, avg=47.92168145712498), Row(cust_id='65', sum=5140.349995829165, max=98.52999877929688, avg=48.493867885180805), Row(cust_id='4', sum=4815.050017774105, max=96.23999786376953, avg=49.63969090488768), Row(cust_id='39', sum=6193.109993815422, max=99.7699966430664, avg=52.04294112449934), Row(cust_id='62', sum=5253.320019304752, max=99.87000274658203, avg=52.013069498066855), Row(cust_id='12', sum=4664.589988231659, max=98.73999786376953, avg=50.70206508947455), Row(cust_id='83', sum=4635.799997210503, max=99.23999786376953, avg=47.79175254856188), Row(cust_id='13', sum=4367.619992315769, max=98.13999938964844, avg=50.202528647307695), Row(cust_id='14', sum=4735.0300142765045, max=99.73999786376953, avg=47.35030014276504), Row(cust_id='21', sum=4707.409991621971, max=98.20999908447266, avg=50.07882969810608), Row(cust_id='66', sum=4681.919987842441, max=98.72000122070312, avg=50.343225675725165), Row(cust_id='91', sum=4642.259980916977, max=98.38999938964844, avg=51.0138459441426), Row(cust_id='94', sum=4475.569978475571, max=99.83999633789062, avg=45.66908141301603), Row(cust_id='72', sum=5337.439985603094, max=98.04000091552734, avg=53.37439985603094), Row(cust_id='74', sum=4647.129976034164, max=99.33999633789062, avg=49.43755293653366), Row(cust_id='76', sum=4904.210003614426, max=97.91000366210938, avg=53.30663047406984), Row(cust_id='2', sum=5994.589979887009, max=99.54000091552734, avg=50.80160999904245), Row(cust_id='80', sum=4727.860013484955, max=98.87000274658203, avg=54.97511643587157), Row(cust_id='50', sum=4517.269991545007, max=98.75, avg=45.628989813585925), Row(cust_id='57', sum=4628.399988114834, max=98.97000122070312, avg=50.86153833093224), Row(cust_id='45', sum=3309.3800055980682, max=92.20999908447266, avg=44.721351427000926)]
Spark SQL로 처리해보기
In [15]:
df.createOrReplaceTempView("customer_orders")
In [16]:
spark.sql("""SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
FROM customer_orders
GROUP BY 1""").head(5)
Out[16]:
[Row(cust_id='51', sum=4975.219970226288, max=97.61000061035156, avg=48.77666637476753), Row(cust_id='7', sum=4755.070008277893, max=98.5999984741211, avg=50.58585115189248), Row(cust_id='15', sum=5413.510010659695, max=99.56999969482422, avg=52.05298087172783), Row(cust_id='54', sum=6065.390002984554, max=99.2300033569336, avg=49.31211384540288), Row(cust_id='11', sum=5152.289969373494, max=99.11000061035156, avg=47.70638860531013)]
Spark은 기본으로 in-memory 카탈로그를 사용. 스토리지 기반의 카탈로그를 쓰고 싶다면 SparkSession 설정할 때 enableHiveSupport()를 호출 (Hive metastore를 카탈로그로 사용하며 Hive UDF와 Hive 파일포맷 사용 가능)
In [17]:
spark.catalog.listTables()
Out[17]:
[Table(name='customer_orders', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
'하둡,spark' 카테고리의 다른 글
spark 데이터프레임 실습4 (0) | 2024.01.24 |
---|---|
spark 데이터프레임 실습3 (0) | 2024.01.24 |
spark 데이터프레임 실습1 (0) | 2024.01.23 |
spark 기초 실습(colab) (0) | 2024.01.19 |
spark 데이터 구조 (0) | 2024.01.19 |