하둡,spark
spark 기초 실습(colab)
데이터왕
2024. 1. 19. 10:50
PySpark을 로컬머신에 설치하고 노트북을 사용하기 보다는 머신러닝 관련 다양한 라이브러리가 이미 설치되었고 좋은 하드웨어를 제공해주는 Google Colab을 통해 실습을 진행한다. 이를 위해 pyspark과 Py4J 패키지를 설치한다. Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.
PySpark 설치¶
In [1]:
!pip install pyspark==3.3.1 py4j==0.10.9.5
Collecting pyspark==3.3.1 Downloading pyspark-3.3.1.tar.gz (281.4 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 281.4/281.4 MB 2.2 MB/s eta 0:00:00 Preparing metadata (setup.py) ... done Collecting py4j==0.10.9.5 Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 199.7/199.7 kB 10.9 MB/s eta 0:00:00 Building wheels for collected packages: pyspark Building wheel for pyspark (setup.py) ... done Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845494 sha256=f9b1d3e7975d64d196c53d43f9b839d65e3d70206313d83750dc23c138b05a11 Stored in directory: /root/.cache/pip/wheels/0f/f0/3d/517368b8ce80486e84f89f214e0a022554e4ee64969f46279b Successfully built pyspark Installing collected packages: py4j, pyspark Attempting uninstall: py4j Found existing installation: py4j 0.10.9.7 Uninstalling py4j-0.10.9.7: Successfully uninstalled py4j-0.10.9.7 Successfully installed py4j-0.10.9.5 pyspark-3.3.1
Spark Session: SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다. SparkSession을 이용해 RDD, 데이터 프레임등을 만든다. SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정이 가능하다
- local[*] Spark이 하나의 JVM으로 동작하고 그 안에 컴퓨터의 코어 수 만큼의 스레드가 Executor로 동작한다
In [2]:
from pyspark.sql import SparkSession
# Spark Session을 생성합니다.
# - master: 클러스터 매니저로서 local[*]을 사용하여 로컬 스탠드얼론 모드로 설정합니다.
# 별표(*)는 컴퓨터의 모든 CPU 코어를 활용하도록 지정합니다.
# - appName: 애플리케이션의 이름을 'PySpark Tutorial'로 설정합니다.
# .getOrCreate() 메소드는 SparkSession 객체를 생성하거나 기존에 생성된 객체를 반환
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
In [3]:
spark
Out[3]:
SparkSession - in-memory
In [6]:
# CPU(s): 2 (두개의 쓰레드로 동작할 수 있음)
!lscpu
Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 46 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 2 On-line CPU(s) list: 0,1 Vendor ID: GenuineIntel Model name: Intel(R) Xeon(R) CPU @ 2.20GHz CPU family: 6 Model: 79 Thread(s) per core: 2 Core(s) per socket: 1 Socket(s): 1 Stepping: 0 BogoMIPS: 4399.99 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clf lush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_ good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fm a cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hyp ervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd ibrs ibpb stibp fsgsb ase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsa veopt arat md_clear arch_capabilities Virtualization features: Hypervisor vendor: KVM Virtualization type: full Caches (sum of all): L1d: 32 KiB (1 instance) L1i: 32 KiB (1 instance) L2: 256 KiB (1 instance) L3: 55 MiB (1 instance) NUMA: NUMA node(s): 1 NUMA node0 CPU(s): 0,1 Vulnerabilities: Gather data sampling: Not affected Itlb multihit: Not affected L1tf: Mitigation; PTE Inversion Mds: Vulnerable; SMT Host state unknown Meltdown: Vulnerable Mmio stale data: Vulnerable Retbleed: Vulnerable Spec rstack overflow: Not affected Spec store bypass: Vulnerable Spectre v1: Vulnerable: __user pointer sanitization and usercopy barriers only; no swap gs barriers Spectre v2: Vulnerable, IBPB: disabled, STIBP: disabled, PBRSB-eIBRS: Not affected Srbds: Not affected Tsx async abort: Vulnerable
In [8]:
# 메모리 체크
!grep MemTotal /proc/meminfo
MemTotal: 13290480 kB
Python <> RDD <> DataFrame¶
Python 객체를 RDD로 변환해보기
1> Python 리스트 생성
In [9]:
# jason형식의 문자열 저장
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
In [10]:
for n in name_list_json:
print(n)
{"name": "keeyong"} {"name": "benjamin"} {"name": "claire"}
2> 파이썬 리스트를 RDD로 변환
- RDD로 변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨 (파티션)
In [11]:
# spark.sparkContext: SparkSession에서 SparkContext를 얻습니다.
# parallelize(name_list_json): SparkContext의 parallelize 함수를 사용하여
# name_list_json이라는 리스트를 병렬화하여 RDD를 생성합니다.
rdd = spark.sparkContext.parallelize(name_list_json)
In [13]:
rdd
Out[13]:
ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274
In [14]:
rdd.count()
Out[14]:
3
In [16]:
import json
# json 문자열 들을 json structure로 구성
parsed_rdd = rdd.map(lambda el:json.loads(el))
In [17]:
parsed_rdd
Out[17]:
PythonRDD[3] at RDD at PythonRDD.scala:53
In [18]:
# 아까와 달리 형식이 바뀜
parsed_rdd.collect()
Out[18]:
[{'name': 'keeyong'}, {'name': 'benjamin'}, {'name': 'claire'}]
In [22]:
# jSON 형식의 문자열을 가진 RDD에서 "name" 필드를 추출하여 새로운 RDD를 생성
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"])
In [23]:
parsed_name_rdd.collect()
Out[23]:
['keeyong', 'benjamin', 'claire']
파이썬 리스트를 데이터프레임으로 변환하기
In [25]:
from pyspark.sql.types import StringType
name_list_json =[ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
df = spark.createDataFrame(name_list_json, StringType())
In [26]:
df.count()
Out[26]:
3
In [30]:
# PySpark에서 DataFrame의 스키마를 출력하는 메소드입니다. DataFrame의 스키마는
# 각 컬럼의 이름과 데이터 유형을 나타내는 구조
df.printSchema()
root |-- value: string (nullable = true)
In [31]:
# PySpark DataFrame에서 모든 열(*는 모든 열을 나타냄)을 선택하고 해당 데이터를 수집
# (collect)하여 로컬에서 리스트로 반환하는 작업
df.select('*').collect()
Out[31]:
[Row(value='{"name": "keeyong"}'), Row(value='{"name": "benjamin"}'), Row(value='{"name": "claire"}')]
RDD를 DataFrame으로 변환해보는 예제: 앞서 parsed_rdd를 DataFrame으로 변환해보자
In [32]:
df_parsed_rdd = parsed_rdd.toDF()
In [33]:
df_parsed_rdd.printSchema()
root |-- name: string (nullable = true)
In [34]:
df_parsed_rdd.select('name').collect()
Out[34]:
[Row(name='keeyong'), Row(name='benjamin'), Row(name='claire')]
Spark 데이터프레임으로 로드해보기¶
In [35]:
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv
--2024-01-19 01:14:15-- https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv Resolving s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)... 3.5.85.14, 52.92.138.106, 52.92.237.194, ... Connecting to s3-geospatial.s3-us-west-2.amazonaws.com (s3-geospatial.s3-us-west-2.amazonaws.com)|3.5.85.14|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 997 [text/csv] Saving to: ‘name_gender.csv’ name_gender.csv 100%[===================>] 997 --.-KB/s in 0s 2024-01-19 01:14:15 (14.3 MB/s) - ‘name_gender.csv’ saved [997/997]
In [36]:
df = spark.read.csv("name_gender.csv")
df.printSchema()
root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true)
In [37]:
df = spark.read.option("header", True).csv("name_gender.csv")
df.printSchema()
root |-- name: string (nullable = true) |-- gender: string (nullable = true)
In [38]:
df.show()
+----------+------+ | name|gender| +----------+------+ | Adaleigh| F| | Amryn|Unisex| | Apurva|Unisex| | Aryion| M| | Alixia| F| |Alyssarose| F| | Arvell| M| | Aibel| M| | Atiyyah| F| | Adlie| F| | Anyely| F| | Aamoni| F| | Ahman| M| | Arlane| F| | Armoney| F| | Atzhiry| F| | Antonette| F| | Akeelah| F| | Abdikadir| M| | Arinze| M| +----------+------+ only showing top 20 rows
In [39]:
df.head(5)
Out[39]:
[Row(name='Adaleigh', gender='F'), Row(name='Amryn', gender='Unisex'), Row(name='Apurva', gender='Unisex'), Row(name='Aryion', gender='M'), Row(name='Alixia', gender='F')]
In [40]:
df.groupby(["gender"]).count().collect()
Out[40]:
[Row(gender='F', count=65), Row(gender='M', count=28), Row(gender='Unisex', count=7)]
In [42]:
# 파티션 수
df.rdd.getNumPartitions()
Out[42]:
1
데이터프레임을 테이블뷰로 만들어서 SparkSQL로 처리해보기
In [43]:
df.createOrReplaceTempView("namegender")
In [44]:
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")
In [45]:
namegender_group_df.collect()
Out[45]:
[Row(gender='F', count(1)=65), Row(gender='M', count(1)=28), Row(gender='Unisex', count(1)=7)]
In [46]:
spark.catalog.listTables()
Out[46]:
[Table(name='namegender', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
Partition의 수 계산해보기
In [47]:
namegender_group_df.rdd.getNumPartitions()
Out[47]:
1
In [48]:
two_namegender_group_df = namegender_group_df.repartition(2)
In [49]:
two_namegender_group_df.rdd.getNumPartitions()
Out[49]:
2
In [52]:
from google.colab import drive
drive.mount('/content/drive')
Mounted at /content/drive
In [55]:
!jupyter nbconvert --to markdown "/content/drive/MyDrive/Colab Notebooks/notebook_test.ipynb"
[NbConvertApp] WARNING | pattern '/content/drive/MyDrive/Colab Notebooks/notebook_test.ipynb' matched no files This application is used to convert notebook files (*.ipynb) to various other formats. WARNING: THE COMMANDLINE INTERFACE MAY CHANGE IN FUTURE RELEASES. Options ======= The options below are convenience aliases to configurable class-options, as listed in the "Equivalent to" description-line of the aliases. To see all configurable class-options for some <cmd>, use: <cmd> --help-all --debug set log level to logging.DEBUG (maximize logging output) Equivalent to: [--Application.log_level=10] --show-config Show the application's configuration (human-readable format) Equivalent to: [--Application.show_config=True] --show-config-json Show the application's configuration (json format) Equivalent to: [--Application.show_config_json=True] --generate-config generate default config file Equivalent to: [--JupyterApp.generate_config=True] -y Answer yes to any questions instead of prompting. Equivalent to: [--JupyterApp.answer_yes=True] --execute Execute the notebook prior to export. Equivalent to: [--ExecutePreprocessor.enabled=True] --allow-errors Continue notebook execution even if one of the cells throws an error and include the error message in the cell output (the default behaviour is to abort conversion). This flag is only relevant if '--execute' was specified, too. Equivalent to: [--ExecutePreprocessor.allow_errors=True] --stdin read a single notebook file from stdin. Write the resulting notebook with default basename 'notebook.*' Equivalent to: [--NbConvertApp.from_stdin=True] --stdout Write notebook output to stdout instead of files. Equivalent to: [--NbConvertApp.writer_class=StdoutWriter] --inplace Run nbconvert in place, overwriting the existing notebook (only relevant when converting to notebook format) Equivalent to: [--NbConvertApp.use_output_suffix=False --NbConvertApp.export_format=notebook --FilesWriter.build_directory=] --clear-output Clear output of current file and save in place, overwriting the existing notebook. Equivalent to: [--NbConvertApp.use_output_suffix=False --NbConvertApp.export_format=notebook --FilesWriter.build_directory= --ClearOutputPreprocessor.enabled=True] --no-prompt Exclude input and output prompts from converted document. Equivalent to: [--TemplateExporter.exclude_input_prompt=True --TemplateExporter.exclude_output_prompt=True] --no-input Exclude input cells and output prompts from converted document. This mode is ideal for generating code-free reports. Equivalent to: [--TemplateExporter.exclude_output_prompt=True --TemplateExporter.exclude_input=True --TemplateExporter.exclude_input_prompt=True] --allow-chromium-download Whether to allow downloading chromium if no suitable version is found on the system. Equivalent to: [--WebPDFExporter.allow_chromium_download=True] --disable-chromium-sandbox Disable chromium security sandbox when converting to PDF.. Equivalent to: [--WebPDFExporter.disable_sandbox=True] --show-input Shows code input. This flag is only useful for dejavu users. Equivalent to: [--TemplateExporter.exclude_input=False] --embed-images Embed the images as base64 dataurls in the output. This flag is only useful for the HTML/WebPDF/Slides exports. Equivalent to: [--HTMLExporter.embed_images=True] --sanitize-html Whether the HTML in Markdown cells and cell outputs should be sanitized.. Equivalent to: [--HTMLExporter.sanitize_html=True] --log-level=<Enum> Set the log level by value or name. Choices: any of [0, 10, 20, 30, 40, 50, 'DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'] Default: 30 Equivalent to: [--Application.log_level] --config=<Unicode> Full path of a config file. Default: '' Equivalent to: [--JupyterApp.config_file] --to=<Unicode> The export format to be used, either one of the built-in formats ['asciidoc', 'custom', 'html', 'latex', 'markdown', 'notebook', 'pdf', 'python', 'rst', 'script', 'slides', 'webpdf'] or a dotted object name that represents the import path for an ``Exporter`` class Default: '' Equivalent to: [--NbConvertApp.export_format] --template=<Unicode> Name of the template to use Default: '' Equivalent to: [--TemplateExporter.template_name] --template-file=<Unicode> Name of the template file to use Default: None Equivalent to: [--TemplateExporter.template_file] --theme=<Unicode> Template specific theme(e.g. the name of a JupyterLab CSS theme distributed as prebuilt extension for the lab template) Default: 'light' Equivalent to: [--HTMLExporter.theme] --sanitize_html=<Bool> Whether the HTML in Markdown cells and cell outputs should be sanitized.This should be set to True by nbviewer or similar tools. Default: False Equivalent to: [--HTMLExporter.sanitize_html] --writer=<DottedObjectName> Writer class used to write the results of the conversion Default: 'FilesWriter' Equivalent to: [--NbConvertApp.writer_class] --post=<DottedOrNone> PostProcessor class used to write the results of the conversion Default: '' Equivalent to: [--NbConvertApp.postprocessor_class] --output=<Unicode> overwrite base name use for output files. can only be used when converting one notebook at a time. Default: '' Equivalent to: [--NbConvertApp.output_base] --output-dir=<Unicode> Directory to write output(s) to. Defaults to output to the directory of each notebook. To recover previous default behaviour (outputting to the current working directory) use . as the flag value. Default: '' Equivalent to: [--FilesWriter.build_directory] --reveal-prefix=<Unicode> The URL prefix for reveal.js (version 3.x). This defaults to the reveal CDN, but can be any url pointing to a copy of reveal.js. For speaker notes to work, this must be a relative path to a local copy of reveal.js: e.g., "reveal.js". If a relative path is given, it must be a subdirectory of the current directory (from which the server is run). See the usage documentation (https://nbconvert.readthedocs.io/en/latest/usage.html#reveal-js-html-slideshow) for more details. Default: '' Equivalent to: [--SlidesExporter.reveal_url_prefix] --nbformat=<Enum> The nbformat version to write. Use this to downgrade notebooks. Choices: any of [1, 2, 3, 4] Default: 4 Equivalent to: [--NotebookExporter.nbformat_version] Examples -------- The simplest way to use nbconvert is > jupyter nbconvert mynotebook.ipynb --to html Options include ['asciidoc', 'custom', 'html', 'latex', 'markdown', 'notebook', 'pdf', 'python', 'rst', 'script', 'slides', 'webpdf']. > jupyter nbconvert --to latex mynotebook.ipynb Both HTML and LaTeX support multiple output templates. LaTeX includes 'base', 'article' and 'report'. HTML includes 'basic', 'lab' and 'classic'. You can specify the flavor of the format used. > jupyter nbconvert --to html --template lab mynotebook.ipynb You can also pipe the output to stdout, rather than a file > jupyter nbconvert mynotebook.ipynb --stdout PDF is generated via latex > jupyter nbconvert mynotebook.ipynb --to pdf You can get (and serve) a Reveal.js-powered slideshow > jupyter nbconvert myslides.ipynb --to slides --post serve Multiple notebooks can be given at the command line in a couple of different ways: > jupyter nbconvert notebook*.ipynb > jupyter nbconvert notebook1.ipynb notebook2.ipynb or you can specify the notebooks list in a config file, containing:: c.NbConvertApp.notebooks = ["my_notebook.ipynb"] > jupyter nbconvert --config mycfg.py To see all available configurables, use `--help-all`.