PySpark Practice Problems

Mageswaran D
3 min readJan 2, 2022
  1. How to transform array of arrays into columns?
import pyspark.sql.functions as F

df = spark.createDataFrame(
[([["a","b","c"], ["d","e","f"], ["g","h","i", "j"]],)],
["data"]
)

df.show(20, False)

df = df.withColumn("data1", F.explode("data"))
df.select('data1').show()

# Row(max(size(data1))=4) ---> 4
max_size = df.select(F.max(F.size('data1'))).collect()[0][0]


df.select(
*[F.col("data1")[i].alias(f"col_{i}") for i in range(max_size)]
).show()



+------------------------------------+
|data |
+------------------------------------+
|[[a, b, c], [d, e, f], [g, h, i, j]]|
+------------------------------------+

+------------+
| data1|
+------------+
| [a, b, c]|
| [d, e, f]|
|[g, h, i, j]|
+------------+

+-----+-----+-----+-----+
|col_0|col_1|col_2|col_3|
+-----+-----+-----+-----+
| a| b| c| null|
| d| e| f| null|
| g| h| i| j|
+-----+-----+-----+-----+

2. How to create a new dataframe from two time series dataframes?

# Dataset 1cust_id     pt_dt985XFT82Y4  20200824
985XFT82Y4 20200826
985XFT82Y4 20200902
985XFT82Y4 20200918
985XFT82Y4 20200930
985XFT82Y4 20201016
985XFT82Y4 20201021
985XFT82Y4 20201102
985XFT82Y4 20201111
985XFT82Y4 20201112
985XFT82Y4 20201208
985XFT82Y4 20210111
985XFT82Y4 20210202
985XFT82Y4 20210303
985XFT82Y4 20210309
985XFT82Y4 20210311

# Dataset 2
cust_id chg_date ins_status
985XFT82Y4 2020-08-24 22:12:34.332000 subscribed
985XFT82Y4 2020-11-11 14:45:31.152000 installed
985XFT82Y4 2021-02-02 01:26:34.500000 migration
985XFT82Y4 2021-03-09 08:11:57.790000 setup done

# Expected output
cust_id pt_dt ins_status
985XFT82Y4 20200824 subscribed
985XFT82Y4 20200826 subscribed
985XFT82Y4 20200902 subscribed
985XFT82Y4 20200918 subscribed
985XFT82Y4 20200930 subscribed
985XFT82Y4 20201016 subscribed
985XFT82Y4 20201021 subscribed
985XFT82Y4 20201102 subscribed
985XFT82Y4 20201111 installed
985XFT82Y4 20201112 installed
985XFT82Y4 20201208 installed
985XFT82Y4 20210111 installed
985XFT82Y4 20210202 migration
985XFT82Y4 20210303 migration
985XFT82Y4 20210309 setup done
985XFT82Y4 20210311 setup done
import pyspark.sql.functions as F
from pyspark.sql import Window
from datetime import datetime
data = [("985XFT82Y4", "20200824"),
("985XFT82Y4", "20200826"),
("985XFT82Y4", "20200902"),
("985XFT82Y4", "20200918"),
("985XFT82Y4", "20200930"),
("985XFT82Y4", "20201016"),
("985XFT82Y4", "20201021"),
("985XFT82Y4", "20201102"),
("985XFT82Y4", "20201111"),
("985XFT82Y4", "20201112"),
("985XFT82Y4", "20201208"),
("985XFT82Y4", "20210111"),
("985XFT82Y4", "20210202"),
("985XFT82Y4", "20210303"),
("985XFT82Y4", "20210309"),
("985XFT82Y4", "20210311")]
df1 = spark.createDataFrame(data, ["cust_id", "pt_dt"]).withColumn("pt_dt", F.to_timestamp("pt_dt", "yyyyMMdd")).withColumn("pt_dt", F.date_format(F.col('pt_dt'),"yyyy-MM-dd"))
df1.show()
data1 = [("985XFT82Y4", "2020-08-24 22:12:34.332000", "subscribed"),
("985XFT82Y4", "2020-11-11 14:45:31.152000", "installed"),
("985XFT82Y4", "2021-02-02 01:26:34.500000", "migration"),
("985XFT82Y4", "2021-03-09 08:11:57.790000", "setup done")]
ts_pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS"
df2 = spark.createDataFrame(data1, ["cust_id", "chg_date", "ins_status"]).withColumn("chg_date", F.to_timestamp("chg_date", ts_pattern)).withColumn("chg_date", F.date_format(F.col('chg_date'),"yyyy-MM-dd"))
df2.show()
window_spec = Window.partitionBy("cust_id").orderBy("chg_date")
df2 = df2.withColumn("end_chg_date", F.lead("chg_date").over(window_spec))
df2.show()
cond = [df1["cust_id"] == df2["cust_id"], df1["pt_dt"] >= df2["chg_date"], df2["end_chg_date"].isNull() | (df1["pt_dt"] < df2["end_chg_date"])]
df3 = df1.join(df2, cond, "left").select(df1["cust_id"], df1["pt_dt"], "ins_status").orderBy("pt_dt")
# use df1 in select to resolve same column name conflict
df3.show()

3. Group DataFrame by hour of the Day:

https://stackoverflow.com/questions/66647892/group-df-by-hour-of-day/66649736#66649736

import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.Window

val df = Seq(
(75, "2019-01-19 02:13:00", 5 , "Brooklyn", "Williamsburg"),
(255, "2019-01-19 12:05:00", 8 , "Brooklyn", "Williamsburg"),
(99, "2019-01-20 12:05:00", 3 , "Brooklyn", "DUMBO"),
(102, "2019-01-01 02:05:00", 1 , "Brooklyn", "DUBMO"),
(10, "2019-01-07 11:05:00", 13, "Brooklyn", "Park Slope"),
(75, "2019-01-01 11:05:00", 2 , "Brooklyn", "Williamsburg"),
(12, "2019-01-11 01:05:00", 1 , "Brooklyn", "Park Slope"),
(98, "2019-01-28 01:05:00", 8 , "Brooklyn", "DUMBO"),
(75, "2019-01-10 00:05:00", 8 , "Brooklyn", "Williamsburg"),
(255, "2019-01-11 12:05:00", 12, "Brooklyn", "DUMBO"),
).toDF("PULocationID", "pickup_datetime", "number_of_pickups", "Borough", "Zone")
df.show()

val df1 = df.
withColumn("pickup_datetime", F.to_timestamp(F.col("pickup_datetime"),"yyyy-MM-dd HH:mm:ss")).
withColumn("hour", F.hour(F.col("pickup_datetime")))
df1.show()
df1.printSchema()

val windowSpec = Window.partitionBy("hour").orderBy(F.desc("number_of_pickups"))
val df2 = df1.withColumn("rn", F.row_number.over(windowSpec))

df2.filter(F.col("rn") === 1).drop(F.col("rn")).select("hour", "Zone", "number_of_pickups").show()

`

--

--