PySpark Practice Problems

  1. How to transform array of arrays into columns?
import pyspark.sql.functions as F

df = spark.createDataFrame(
[([["a","b","c"], ["d","e","f"], ["g","h","i", "j"]],)],
["data"]
)

df.show(20, False)

df = df.withColumn("data1", F.explode("data"))
df.select('data1').show()

# Row(max(size(data1))=4) ---> 4
max_size = df.select(F.max(F.size('data1'))).collect()[0][0]


df.select(
*[F.col("data1")[i].alias(f"col_{i}") for i in range(max_size)]
).show()



+------------------------------------+
|data |
+------------------------------------+
|[[a, b, c], [d, e, f], [g, h, i, j]]|
+------------------------------------+

+------------+
| data1|
+------------+
| [a, b, c]|
| [d, e, f]|
|[g, h, i, j]|
+------------+

+-----+-----+-----+-----+
|col_0|col_1|col_2|col_3|
+-----+-----+-----+-----+
| a| b| c| null|
| d| e| f| null|
| g| h| i| j|
+-----+-----+-----+-----+

2. How to create a new dataframe from two time series dataframes?

# Dataset 1cust_id     pt_dt985XFT82Y4  20200824
985XFT82Y4 20200826
985XFT82Y4 20200902
985XFT82Y4 20200918
985XFT82Y4 20200930
985XFT82Y4 20201016
985XFT82Y4 20201021
985XFT82Y4 20201102
985XFT82Y4 20201111
985XFT82Y4 20201112
985XFT82Y4 20201208
985XFT82Y4 20210111
985XFT82Y4 20210202
985XFT82Y4 20210303
985XFT82Y4 20210309
985XFT82Y4 20210311

# Dataset 2
cust_id chg_date ins_status
985XFT82Y4 2020-08-24 22:12:34.332000 subscribed
985XFT82Y4 2020-11-11 14:45:31.152000 installed
985XFT82Y4 2021-02-02 01:26:34.500000 migration
985XFT82Y4 2021-03-09 08:11:57.790000 setup done

# Expected output
cust_id pt_dt ins_status
985XFT82Y4 20200824 subscribed
985XFT82Y4 20200826 subscribed
985XFT82Y4 20200902 subscribed
985XFT82Y4 20200918 subscribed
985XFT82Y4 20200930 subscribed
985XFT82Y4 20201016 subscribed
985XFT82Y4 20201021 subscribed
985XFT82Y4 20201102 subscribed
985XFT82Y4 20201111 installed
985XFT82Y4 20201112 installed
985XFT82Y4 20201208 installed
985XFT82Y4 20210111 installed
985XFT82Y4 20210202 migration
985XFT82Y4 20210303 migration
985XFT82Y4 20210309 setup done
985XFT82Y4 20210311 setup done
import pyspark.sql.functions as F
from pyspark.sql import Window
from datetime import datetime
data = [("985XFT82Y4", "20200824"),
("985XFT82Y4", "20200826"),
("985XFT82Y4", "20200902"),
("985XFT82Y4", "20200918"),
("985XFT82Y4", "20200930"),
("985XFT82Y4", "20201016"),
("985XFT82Y4", "20201021"),
("985XFT82Y4", "20201102"),
("985XFT82Y4", "20201111"),
("985XFT82Y4", "20201112"),
("985XFT82Y4", "20201208"),
("985XFT82Y4", "20210111"),
("985XFT82Y4", "20210202"),
("985XFT82Y4", "20210303"),
("985XFT82Y4", "20210309"),
("985XFT82Y4", "20210311")]
df1 = spark.createDataFrame(data, ["cust_id", "pt_dt"]).withColumn("pt_dt", F.to_timestamp("pt_dt", "yyyyMMdd")).withColumn("pt_dt", F.date_format(F.col('pt_dt'),"yyyy-MM-dd"))
df1.show()
data1 = [("985XFT82Y4", "2020-08-24 22:12:34.332000", "subscribed"),
("985XFT82Y4", "2020-11-11 14:45:31.152000", "installed"),
("985XFT82Y4", "2021-02-02 01:26:34.500000", "migration"),
("985XFT82Y4", "2021-03-09 08:11:57.790000", "setup done")]
ts_pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS"
df2 = spark.createDataFrame(data1, ["cust_id", "chg_date", "ins_status"]).withColumn("chg_date", F.to_timestamp("chg_date", ts_pattern)).withColumn("chg_date", F.date_format(F.col('chg_date'),"yyyy-MM-dd"))
df2.show()
window_spec = Window.partitionBy("cust_id").orderBy("chg_date")
df2 = df2.withColumn("end_chg_date", F.lead("chg_date").over(window_spec))
df2.show()
cond = [df1["cust_id"] == df2["cust_id"], df1["pt_dt"] >= df2["chg_date"], df2["end_chg_date"].isNull() | (df1["pt_dt"] < df2["end_chg_date"])]
df3 = df1.join(df2, cond, "left").select(df1["cust_id"], df1["pt_dt"], "ins_status").orderBy("pt_dt")
# use df1 in select to resolve same column name conflict
df3.show()

3. Group DataFrame by hour of the Day:

https://stackoverflow.com/questions/66647892/group-df-by-hour-of-day/66649736#66649736

import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.Window

val df = Seq(
(75, "2019-01-19 02:13:00", 5 , "Brooklyn", "Williamsburg"),
(255, "2019-01-19 12:05:00", 8 , "Brooklyn", "Williamsburg"),
(99, "2019-01-20 12:05:00", 3 , "Brooklyn", "DUMBO"),
(102, "2019-01-01 02:05:00", 1 , "Brooklyn", "DUBMO"),
(10, "2019-01-07 11:05:00", 13, "Brooklyn", "Park Slope"),
(75, "2019-01-01 11:05:00", 2 , "Brooklyn", "Williamsburg"),
(12, "2019-01-11 01:05:00", 1 , "Brooklyn", "Park Slope"),
(98, "2019-01-28 01:05:00", 8 , "Brooklyn", "DUMBO"),
(75, "2019-01-10 00:05:00", 8 , "Brooklyn", "Williamsburg"),
(255, "2019-01-11 12:05:00", 12, "Brooklyn", "DUMBO"),
).toDF("PULocationID", "pickup_datetime", "number_of_pickups", "Borough", "Zone")
df.show()

val df1 = df.
withColumn("pickup_datetime", F.to_timestamp(F.col("pickup_datetime"),"yyyy-MM-dd HH:mm:ss")).
withColumn("hour", F.hour(F.col("pickup_datetime")))
df1.show()
df1.printSchema()

val windowSpec = Window.partitionBy("hour").orderBy(F.desc("number_of_pickups"))
val df2 = df1.withColumn("rn", F.row_number.over(windowSpec))

df2.filter(F.col("rn") === 1).drop(F.col("rn")).select("hour", "Zone", "number_of_pickups").show()

`

--

--

--

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Toit: Containers on Microcontrollers

How To Fix The “We Are Unable To Create An Authentication Session” error in Visual Studio

How to fix the we are unable to create an authentication session error when publishing your xamarin project to the appstore

Get more brains into your code

Classes in Python: Fundamentals for Data Scientists

Why GitHub Copilot Will Not Change Programming

Developer Glossary: 10 Phrases Product Managers Need to Understand

How to install Airflow locally using Docker

Airflow Dashboard

Lessons moving from engineering to software product management in a broke startup

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Mageswaran D

Mageswaran D

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

More from Medium

A Guide to Spark DataFrameWriter API

Learning Spark — Part 1 Spark Environment Installation

Understanding Scala programming Language by Solving problem statement by Using Apache Spark

Introduction to structured streaming with apache-spark