In [ ]:
!pip install pyspark==3.3.1 py4j==0.10.9.5
In [ ]:
!cd /usr/local/lib/python3.8/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar
--2023-01-16 23:20:12-- https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.97.221, 54.231.197.16, 54.231.233.80, ... Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.97.221|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 2413910 (2.3M) [application/java-archive] Saving to: ‘RedshiftJDBC42-no-awssdk-1.2.20.1043.jar’ RedshiftJDBC42-no-a 100%[===================>] 2.30M 12.9MB/s in 0.2s 2023-01-16 23:20:13 (12.9 MB/s) - ‘RedshiftJDBC42-no-awssdk-1.2.20.1043.jar’ saved [2413910/2413910]
In [ ]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("PySpark DataFrame #5") \
.getOrCreate()
Redshift와 연결해서 테이블들을 데이터프레임으로 로딩하기¶
In [ ]:
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
In [ ]:
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
.option("dbtable", "raw_data.session_timestamp") \
.load()
In [ ]:
df_user_session_channel.printSchema()
root |-- userid: integer (nullable = true) |-- sessionid: string (nullable = true) |-- channel: string (nullable = true)
In [ ]:
# DataFrame이 몇 개의 파티션으로 나뉘어 있는지에 대한 정보가 출력
df_user_session_channel.rdd.getNumPartitions()
Out[ ]:
1
In [ ]:
df_session_timestamp.printSchema()
root |-- sessionid: string (nullable = true) |-- ts: timestamp (nullable = true)
In [ ]:
df_session_timestamp.rdd.getNumPartitions()
Out[ ]:
1
DataFrame으로 처리하기¶
In [ ]:
# df_user_session_channel과 df_session_timestamp을 'sessionid' 열을 기준으로
# 내부 조인(inner join)하고 있습니다.
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")
In [ ]:
session_df.printSchema()
root |-- userid: integer (nullable = true) |-- sessionid: string (nullable = true) |-- channel: string (nullable = true) |-- sessionid: string (nullable = true) |-- ts: timestamp (nullable = true)
In [ ]:
session_df.show(5)
+------+--------------------+--------+--------------------+--------------------+ |userid| sessionid| channel| sessionid| ts| +------+--------------------+--------+--------------------+--------------------+ | 1501|0135456d6a3c1051f...| Google|0135456d6a3c1051f...|2019-09-24 14:49:...| | 876|01a416a7e28d0d229...|Facebook|01a416a7e28d0d229...|2019-05-26 14:23:...| | 2133|02ea66ee24d57e285...| Organic|02ea66ee24d57e285...| 2019-10-28 22:50:40| | 1961|0302915889fa38fe5...| Youtube|0302915889fa38fe5...| 2019-11-29 15:16:49| | 599|03ce0331cbd983f16...| Organic|03ce0331cbd983f16...|2019-07-18 13:49:...| +------+--------------------+--------+--------------------+--------------------+ only showing top 5 rows
In [ ]:
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
"userid", "sessionid", "channel", "ts"
)
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) <ipython-input-13-70558d8e0e24> in <module> ----> 1 session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select( 2 "userid", "sessionid", "channel", "ts" 3 ) /usr/local/lib/python3.8/dist-packages/pyspark/sql/dataframe.py in select(self, *cols) 2021 [Row(name='Alice', age=12), Row(name='Bob', age=15)] 2022 """ -> 2023 jdf = self._jdf.select(self._jcols(*cols)) 2024 return DataFrame(jdf, self.sparkSession) 2025 /usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py in __call__(self, *args) 1319 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1323 /usr/local/lib/python3.8/dist-packages/pyspark/sql/utils.py in deco(*a, **kw) 194 # Hide where the exception came from that shows a non-Pythonic 195 # JVM exception message. --> 196 raise converted from None 197 else: 198 raise AnalysisException: Reference 'sessionid' is ambiguous, could be: sessionid, sessionid.
In [ ]:
# 두 DataFrame의 'sessionid' 열을 기준으로 조인하는 조건을 정의
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
"userid", df_user_session_channel.sessionid, "channel", "ts"
)
In [ ]:
channel_count_df = session_df.groupby("channel").count().orderBy("count", ascending=False)
In [ ]:
channel_count_df.show()
+---------+-----+ | channel|count| +---------+-----+ | Youtube|17091| | Google|16982| | Naver|16921| | Organic|16904| |Instagram|16831| | Facebook|16791| +---------+-----+
In [ ]:
from pyspark.sql.functions import date_format, asc, countDistinct
# userid' 열을 기준으로 distinct한 값을 세어(MAU: Monthly Active Users)
# 'mau'라는 별칭(alias)을 갖는 새로운 열을 생성
# Distinct 하게 count
session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
agg(countDistinct("userid").alias("mau")).sort(asc('month')).show()
+-------+---+ | month|mau| +-------+---+ |2019-05|281| |2019-06|459| |2019-07|623| |2019-08|662| |2019-09|639| |2019-10|763| |2019-11|721| +-------+---+
Spark SQL로 처리하기¶
In [ ]:
# DataFrame을 임시 뷰로 등록
df_user_session_channel.createOrReplaceTempView("user_session_channel")
In [ ]:
df_session_timestamp.createOrReplaceTempView("session_timestamp")
In [ ]:
channel_count_df = spark.sql("""
SELECT channel, count(distinct userId) uniqueUsers
FROM session_timestamp st
JOIN user_session_channel usc ON st.sessionID = usc.sessionID
GROUP BY 1
ORDER BY 1
""")
In [ ]:
channel_count_df
Out[ ]:
DataFrame[channel: string, uniqueUsers: bigint]
In [ ]:
channel_count_df.show()
+---------+-----------+ | channel|uniqueUsers| +---------+-----------+ | Facebook| 889| | Google| 893| |Instagram| 895| | Naver| 882| | Organic| 895| | Youtube| 889| +---------+-----------+
In [ ]:
mau_df = spark.sql("""
SELECT
LEFT(A.ts, 7) AS month,
COUNT(DISTINCT B.userid) AS mau
FROM session_timestamp A
JOIN user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
ORDER BY 1 DESC""")
In [ ]:
mau_df.collect()
Out[ ]:
[Row(month='2019-11', mau=721), Row(month='2019-10', mau=763), Row(month='2019-09', mau=639), Row(month='2019-08', mau=662), Row(month='2019-07', mau=623), Row(month='2019-06', mau=459), Row(month='2019-05', mau=281)]
'하둡,spark' 카테고리의 다른 글
spark sql에서 join (0) | 2024.01.25 |
---|---|
spark SQL (1) | 2024.01.25 |
spark 데이터프레임 실습4 (0) | 2024.01.24 |
spark 데이터프레임 실습3 (0) | 2024.01.24 |
spark 데이터프레임 실습2 (0) | 2024.01.23 |