In [1]:
!pip install pyspark==3.3.1 py4j==0.10.9.5
Collecting pyspark==3.3.1
Downloading pyspark-3.3.1.tar.gz (281.4 MB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 281.4/281.4 MB 4.3 MB/s eta 0:00:00
Preparing metadata (setup.py) ... done
Collecting py4j==0.10.9.5
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 199.7/199.7 kB 10.0 MB/s eta 0:00:00
Building wheels for collected packages: pyspark
Building wheel for pyspark (setup.py) ... done
Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845494 sha256=c19df499e9516fd46ec606aaf56c06a396aada6a7de75a905ee68ba89b2586c1
Stored in directory: /root/.cache/pip/wheels/0f/f0/3d/517368b8ce80486e84f89f214e0a022554e4ee64969f46279b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Attempting uninstall: py4j
Found existing installation: py4j 0.10.9.7
Uninstalling py4j-0.10.9.7:
Successfully uninstalled py4j-0.10.9.7
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
In [5]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
# SparkConf 클래스는 Apache Spark 애플리케이션의 구성(configuration)을 설정하기 위한 클래스
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
In [4]:
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
--2024-01-23 04:53:37-- https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
Resolving s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)... 52.92.233.10, 52.92.177.2, 52.92.138.18, ...
Connecting to s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)|52.92.233.10|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 286779 (280K) [text/plain]
Saving to: ‘transfer_cost.txt’
transfer_cost.txt 100%[===================>] 280.06K 1.71MB/s in 0.2s
2024-01-23 04:53:37 (1.71 MB/s) - ‘transfer_cost.txt’ saved [286779/286779]
In [6]:
!ls -tl
total 288
drwxr-xr-x 1 root root 4096 Jan 19 14:20 sample_data
-rw-r--r-- 1 root root 286779 Apr 24 2022 transfer_cost.txt
In [7]:
!head -5 transfer_cost.txt
On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today
On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today
In [16]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
# StructType을 사용하여 구조체(스키마)를 정의합니다. 여기서는 "text"라는 하나의
# 열을 가지고 있으며, 이 열의 데이터 유형은 StringType()이며, 널 값을 허용합니다(True).
schema = StructType([ StructField("text", StringType(), True)])
# "transfer_cost.txt" 파일을 읽어와서 스키마를 적용한 DataFrame 생성
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")
In [17]:
# PySpark DataFrame인 transfer_cost_df의 내용을 출력하는 명령
transfer_cost_df.show(truncate=False)
+---------------------------------------------------------------------------+
|text |
+---------------------------------------------------------------------------+
|On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today |
|On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today |
|On 2021-01-04 the cost per ton from 85001 to 85012 is $18.98 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85013 is 26.64 at Haul Today |
|On 2021-01-04 the cost per ton from 85001 to 85020 is 26.34 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85001 to 85021 is $20.15 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85002 to 85001 is 21.57 at Haul Today |
|On 2021-01-04 the cost per ton from 85002 to 85004 is 21.40 at Haul Today |
|On 2021-01-04 the cost per ton from 85002 to 85007 is 25.93 at Haul Today |
|On 2021-01-04 the cost per ton from 85002 to 85010 is 19.80 at Haul Today |
|On 2021-01-04 the cost per ton from 85002 to 85012 is 21.66 at Haul Today |
|On 2021-01-04 the cost per ton from 85002 to 85013 is $25.90 at Haul Today |
|On 2021-01-04 the cost per ton from 85002 to 85020 is 19.15 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85002 to 85021 is $27.13 at Haul Today |
|On 2021-01-04 the cost per ton from 85004 to 85001 is 23.88 at Haul Today |
|On 2021-01-04 the cost per ton from 85004 to 85002 is 26.40 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85004 to 85007 is 26.03 at ABC Hauling |
+---------------------------------------------------------------------------+
only showing top 20 rows
In [10]:
# PySpark의 함수와 모듈을 불러옵니다
from pyspark.sql.functions import *
# 정규식 패턴을 정의합니다.
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
# 각 열을 새로운 열로 추가하면서 정보를 추출합니다.
# 'week' 열: "On (\S+)"에서 날짜 정보를 추출합니다.
# 'departure_zipcode' 열: "from (\d+)"에서 출발 우편번호를 추출합니다.
# 'arrival_zipcode' 열: "to (\d+)"에서 도착 우편번호를 추출합니다.
# 'cost' 열: "is (\S+)"에서 운송 비용을 추출합니다.
# 'vendor' 열: "at (.*)"에서 운송 업체(vendor) 정보를 추출합니다.
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
.withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
.withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
In [11]:
df_with_new_columns.printSchema()
root
|-- text: string (nullable = true)
|-- week: string (nullable = true)
|-- departure_zipcode: string (nullable = true)
|-- arrival_zipcode: string (nullable = true)
|-- cost: string (nullable = true)
|-- vendor: string (nullable = true)
In [12]:
# text" 열이 제거된 새로운 D.drop("text")ataFrame
final_df = df_with_new_columns
In [13]:
final_df.write.csv("extracted.csv")
In [14]:
!ls -tl
total 292
drwxr-xr-x 2 root root 4096 Jan 23 04:55 extracted.csv
drwxr-xr-x 1 root root 4096 Jan 19 14:20 sample_data
-rw-r--r-- 1 root root 286779 Apr 24 2022 transfer_cost.txt
In [15]:
!ls -tl extracted.csv/
total 156
-rw-r--r-- 1 root root 0 Jan 23 04:55 _SUCCESS
-rw-r--r-- 1 root root 156423 Jan 23 04:55 part-00000-c19163dc-d8a7-488f-ac25-58c2b14105f9-c000.csv
In [19]:
!head -5 extracted.csv/part-00000-c19163dc-d8a7-488f-ac25-58c2b14105f9-c000.csv
2021-01-04,85001,85002,$28.32,ABC Hauling
2021-01-04,85001,85004,$25.68,ABC Hauling
2021-01-04,85001,85007,19.86,ABC Hauling
2021-01-04,85001,85007,20.52,Haul Today
2021-01-04,85001,85010,20.72,Haul Today
In [22]:
# DataFrame의 데이터를 어떻게 저장할지를 지정하는 메서드
final_df.write.format("json").save("extracted.json")
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-22-ba2b7f373793> in <cell line: 2>()
1 # DataFrame의 데이터를 어떻게 저장할지를 지정하는 메서드
----> 2 final_df.write.format("json").save("extracted.json")
/usr/local/lib/python3.10/dist-packages/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
966 self._jwrite.save()
967 else:
--> 968 self._jwrite.save(path)
969
970 @since(1.4)
/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323
/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py in deco(*a, **kw)
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
AnalysisException: path file:/content/extracted.json already exists.
In [21]:
!ls -tl extracted.json/
total 428
-rw-r--r-- 1 root root 0 Jan 23 05:25 _SUCCESS
-rw-r--r-- 1 root root 436305 Jan 23 05:25 part-00000-ae03f8cc-5b87-46ca-8541-e3fbcc533bd1-c000.json
In [25]:
!head -1 extracted.json/part-00000-ae03f8cc-5b87-46ca-8541-e3fbcc533bd1-c000.json
{"week":"2021-01-04","departure_zipcode":"85001","arrival_zipcode":"85002","cost":"$28.32","vendor":"ABC Hauling"}
'하둡,spark' 카테고리의 다른 글
spark 데이터프레임 실습5 (0) | 2024.01.24 |
---|---|
spark 데이터프레임 실습4 (0) | 2024.01.24 |
spark 데이터프레임 실습2 (0) | 2024.01.23 |
spark 데이터프레임 실습1 (0) | 2024.01.23 |
spark 기초 실습(colab) (0) | 2024.01.19 |