PySpark Practice Problems
3 min readJan 2, 2022
- How to transform array of arrays into columns?
import pyspark.sql.functions as F
df = spark.createDataFrame(
[([["a","b","c"], ["d","e","f"], ["g","h","i", "j"]],)],
["data"]
)
df.show(20, False)
df = df.withColumn("data1", F.explode("data"))
df.select('data1').show()
# Row(max(size(data1))=4) ---> 4
max_size = df.select(F.max(F.size('data1'))).collect()[0][0]
df.select(
*[F.col("data1")[i].alias(f"col_{i}") for i in range(max_size)]
).show()
+------------------------------------+
|data |
+------------------------------------+
|[[a, b, c], [d, e, f], [g, h, i, j]]|
+------------------------------------+
+------------+
| data1|
+------------+
| [a, b, c]|
| [d, e, f]|
|[g, h, i, j]|
+------------+
+-----+-----+-----+-----+
|col_0|col_1|col_2|col_3|
+-----+-----+-----+-----+
| a| b| c| null|
| d| e| f| null|
| g| h| i| j|
+-----+-----+-----+-----+
2. How to create a new dataframe from two time series dataframes?
# Dataset 1cust_id pt_dt985XFT82Y4 20200824
985XFT82Y4 20200826
985XFT82Y4 20200902
985XFT82Y4 20200918
985XFT82Y4 20200930
985XFT82Y4 20201016
985XFT82Y4 20201021
985XFT82Y4 20201102
985XFT82Y4 20201111
985XFT82Y4 20201112
985XFT82Y4 20201208
985XFT82Y4 20210111
985XFT82Y4 20210202
985XFT82Y4 20210303
985XFT82Y4 20210309
985XFT82Y4 20210311
# Dataset 2cust_id chg_date ins_status
985XFT82Y4 2020-08-24 22:12:34.332000 subscribed
985XFT82Y4 2020-11-11 14:45:31.152000 installed
985XFT82Y4 2021-02-02 01:26:34.500000 migration
985XFT82Y4 2021-03-09 08:11:57.790000 setup done
# Expected outputcust_id pt_dt ins_status
985XFT82Y4 20200824 subscribed
985XFT82Y4 20200826 subscribed
985XFT82Y4 20200902 subscribed
985XFT82Y4 20200918 subscribed
985XFT82Y4 20200930 subscribed
985XFT82Y4 20201016 subscribed
985XFT82Y4 20201021 subscribed
985XFT82Y4 20201102 subscribed
985XFT82Y4 20201111 installed
985XFT82Y4 20201112 installed
985XFT82Y4 20201208 installed
985XFT82Y4 20210111 installed
985XFT82Y4 20210202 migration
985XFT82Y4 20210303 migration
985XFT82Y4 20210309 setup done
985XFT82Y4 20210311 setup doneimport pyspark.sql.functions as F
from pyspark.sql import Window
from datetime import datetimedata = [("985XFT82Y4", "20200824"),
("985XFT82Y4", "20200826"),
("985XFT82Y4", "20200902"),
("985XFT82Y4", "20200918"),
("985XFT82Y4", "20200930"),
("985XFT82Y4", "20201016"),
("985XFT82Y4", "20201021"),
("985XFT82Y4", "20201102"),
("985XFT82Y4", "20201111"),
("985XFT82Y4", "20201112"),
("985XFT82Y4", "20201208"),
("985XFT82Y4", "20210111"),
("985XFT82Y4", "20210202"),
("985XFT82Y4", "20210303"),
("985XFT82Y4", "20210309"),
("985XFT82Y4", "20210311")]df1 = spark.createDataFrame(data, ["cust_id", "pt_dt"]).withColumn("pt_dt", F.to_timestamp("pt_dt", "yyyyMMdd")).withColumn("pt_dt", F.date_format(F.col('pt_dt'),"yyyy-MM-dd"))
df1.show()data1 = [("985XFT82Y4", "2020-08-24 22:12:34.332000", "subscribed"),
("985XFT82Y4", "2020-11-11 14:45:31.152000", "installed"),
("985XFT82Y4", "2021-02-02 01:26:34.500000", "migration"),
("985XFT82Y4", "2021-03-09 08:11:57.790000", "setup done")]
ts_pattern = "yyyy-MM-dd HH:mm:ss.SSSSSS"df2 = spark.createDataFrame(data1, ["cust_id", "chg_date", "ins_status"]).withColumn("chg_date", F.to_timestamp("chg_date", ts_pattern)).withColumn("chg_date", F.date_format(F.col('chg_date'),"yyyy-MM-dd"))
df2.show()window_spec = Window.partitionBy("cust_id").orderBy("chg_date")
df2 = df2.withColumn("end_chg_date", F.lead("chg_date").over(window_spec))
df2.show()cond = [df1["cust_id"] == df2["cust_id"], df1["pt_dt"] >= df2["chg_date"], df2["end_chg_date"].isNull() | (df1["pt_dt"] < df2["end_chg_date"])]
df3 = df1.join(df2, cond, "left").select(df1["cust_id"], df1["pt_dt"], "ins_status").orderBy("pt_dt")
# use df1 in select to resolve same column name conflict
df3.show()
3. Group DataFrame by hour of the Day:
https://stackoverflow.com/questions/66647892/group-df-by-hour-of-day/66649736#66649736
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.Window
val df = Seq(
(75, "2019-01-19 02:13:00", 5 , "Brooklyn", "Williamsburg"),
(255, "2019-01-19 12:05:00", 8 , "Brooklyn", "Williamsburg"),
(99, "2019-01-20 12:05:00", 3 , "Brooklyn", "DUMBO"),
(102, "2019-01-01 02:05:00", 1 , "Brooklyn", "DUBMO"),
(10, "2019-01-07 11:05:00", 13, "Brooklyn", "Park Slope"),
(75, "2019-01-01 11:05:00", 2 , "Brooklyn", "Williamsburg"),
(12, "2019-01-11 01:05:00", 1 , "Brooklyn", "Park Slope"),
(98, "2019-01-28 01:05:00", 8 , "Brooklyn", "DUMBO"),
(75, "2019-01-10 00:05:00", 8 , "Brooklyn", "Williamsburg"),
(255, "2019-01-11 12:05:00", 12, "Brooklyn", "DUMBO"),
).toDF("PULocationID", "pickup_datetime", "number_of_pickups", "Borough", "Zone")
df.show()
val df1 = df.
withColumn("pickup_datetime", F.to_timestamp(F.col("pickup_datetime"),"yyyy-MM-dd HH:mm:ss")).
withColumn("hour", F.hour(F.col("pickup_datetime")))
df1.show()
df1.printSchema()
val windowSpec = Window.partitionBy("hour").orderBy(F.desc("number_of_pickups"))
val df2 = df1.withColumn("rn", F.row_number.over(windowSpec))
df2.filter(F.col("rn") === 1).drop(F.col("rn")).select("hour", "Zone", "number_of_pickups").show()
`