Apache Spark — Interview Questions

Dive into this blog to get pointers to answer these kind of questions and more!

1. Basics

- In groupByKey the values are grouped but not reduced i.e no operation done on the grouped data
- Whereas in reduceBy, the data is grouped locally, merged locally and then with other partitions
- Slowness of application
- Executor heartbeat timeout
- GC overhead limit exceeded error

2. Intermediate

- https://kontext.tech/column/spark/296/data-partitioning-in-spark-pyspark-in-depth-walkthrough
- Data is split into chunks. Chunks are defined by file size or number of records or key used for the split
- HDFS vs S3 partitioning : While HDFS has its own partition size config defaults to 128MB, AWS S3 depends on the size of the partition defined in the config
- Shuffle partition : Number of partitions created while doing shuffle exchanges. Default is 200 (spark.sql.shuffle.partition)
- Range Partition
- Hash partition
RDD -> `spark.default.parallelism` and DataFrame -> `spark.sql.shuffle.partitions`
- Both helps in filtering the data while reading by scanning only the necessary files for downstream SQL tasks
- Partitioningby column is good but multi level partitioning will lead to many small files on cardinal columns
- Bucketing on cardinal columns will allows as to split the data to specified number of buckets
- With bucket we can specify the number of buckets we needed
- For each partition there will specified number buckets i.e Number of partition * Number of Buckets
- Number of buckets = Size of data / 128MB
- Config: spark.sql.sources.bucketing.enabled
```
df.write\
.bucketBy(16, 'key') \
.sortBy('value') \
.saveAsTable('bucketed', format='parquet')

t2 = spark.table('bucketed')
t3 = spark.table('bucketed')

# bucketed - bucketed join.
# Both sides have the same bucketing, and no shuffles are needed.
t3.join(t2, 'key').explain()

#----------------------------------------------------------------------
== Physical Plan ==
*(3) Project [key#14L, value#15, value#30]
+- *(3) SortMergeJoin [key#14L], [key#29L], Inner
:- *(1) Sort [key#14L ASC NULLS FIRST], false, 0
: +- *(1) Project [key#14L, value#15]
: +- *(1) Filter isnotnull(key#14L)
: +- *(1) FileScan parquet default.bucketed[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- *(2) Sort [key#29L ASC NULLS FIRST], false, 0
+- *(2) Project [key#29L, value#30]
+- *(2) Filter isnotnull(key#29L)
+- *(2) FileScan parquet default.bucketed[key#29L,value#30] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

```
While scanning parquet files, it's best to filter out the columns that are not needed and keep only the ones needed for the query. I.e “filter early and often”
- More number of Map jobs
- Less Disk I/O
- Driver OOM : collect() & broadcast()
- Collect operation
- Broadcast join: When drivers tries to read the entire table on to driver and broadcast to the cluster
- Executor OOM
- Incorrect usage of Spark
- High concurrency
- Each task read takes 128MB data for each HDFS parition
- Inefficient queries
- Incorrect configuration
- External shuffle service on node manager
- Depends on the data dependency, if one to one relation then one of other nodes can recalculate the missing data part
- If one to all dependency encountered, then as per Spark data redundancy policy the the whole stage will be re computed
- PushDownPredicates
- CombineFilters
- InferFiltersFromConstraints : inner joins -> filter on column which is not null
- PruneFilters

3. Advanced

- Make sure the data doesn’t have duplicates
- No null values on joining column (https://blog.clairvoyantsoft.com/optimize-the-skew-in-spark-e523c6ee18ac)
- When joining a small table with a very big table, what care should be taken? How does the partition of tables should be handled?
- Key salting (https://itnext.io/handling-data-skew-in-apache-spark-9f56343e58e8)
# Adding random values to one side of the join
df_big = df_big.withColumn('city', F.concat(df['city'], F.lit('_'), F.lit(F.floor(F.rand(seed=17) * 5) + 1)))# Exploding corresponding values in other table to match the new values of initial table
df_medium = df_medium.withColumn('city_exploded', F.explode(F.array([F.lit(i) for i in range(1,6)])))
df_medium = df_medium.withColumn('city_exploded', F.concat(df_medium['city'], F.lit('_'), df_medium['city_exploded'])). \
drop('city').withColumnRenamed('city_exploded', 'city')# joining
df_join = df_big.join(df_medium, on=['city'], how='inner')
data_distribution = df.rdd.glom().map(len).collect()
While Spark partition is done to chunk the data for processing by Spark executors, Hive partition is for hierarchical storage of data based on column for fast analysis
- 1.3GB : Input Spark Executor memory
- 300 MB : Reserved Memory
- 25 % of (1.3GB - 300MB) = 250MB User memory : To store data objects and data structures
- 75% of of (1.3GB - 300MB) = 750MB Spark Memory Fraction
- Storage Memory : Cache memory
- Execution Memory: Temp memory Eg. Aggregation results
- Yarn Memory Overhead : 10% of Executor memory `spark.yarn.executor.memoryOverhead`
- YM is used to store the runtime class objects and strings
- High Concurrency
- When number of cores are greater than 5, the meta data handling will shoot up leaving no memory while processing the data
- Executor getting Big Partitions due to data skew, which then takes lot of time to process or caching can go wrong
- There are situations where each of the above pools of memory, namely execution and storage, may borrow from each other if the other pool is free. Also, storage memory can be evicted to a limit if it has borrowed memory from execution. However, without going into those complexities, we can configure our program such that our cached data which fits in storage memory should not cause a problem for execution.
- If we don’t want all our cached data to sit in memory, then we can configure “spark.memory.storageFraction” to a lower value so that extra data would get evicted and execution would not face memory pressure.
- Check the input data size and output data size and correlate to operating  cluster memory and adjust the memory accordingly
- Check Input partition size, output partition size and number of partitions along with shuffle partition and decide number of cores
- Check for disk memory spills during stage execution to see for memory issues
- Check number of executors used for given cluster size, and optimize accordingly
- Available cluster memory and memory in use by the application/job
- Check average run time of all stages in the job, and try to identify any of the stages that takes lot of time to execute compared others, which can lead to data skew
- Check whether the table is partitioned by column or not (bucketing)?
- https://blog.clairvoyantsoft.com/spark-logical-and-physical-plans-469a0c061d9e
- https://towardsdatascience.com/should-i-repartition-836f7842298c TODO read
- df.explain(true) : Parsed -> Analysed -> Optimized -> Physical
- Parsed/Unresolved Logical Plan : Syntatically correct
- Analysed/Resolved Logical Plan : Semantic Analysis with Spark catalog(meta store)
- Optimized Logical Plan : WholeCodeGen/Order of execution/filter clause prioritizing
explain(mode=”simple”) shows physical plan.
explain(mode=”extended”) presents physical and logical plans.
explain(mode=”codegen”) shows the java code planned to be executed.
explain(mode=”cost”) presents the optimized logical plan and related statistics (if they exist).
explain(mode=”formatted”) shows a split output created by an optimized physical plan outline, and a section of every node detail.

- https://towardsdatascience.com/mastering-query-plans-in-spark-3-0-f4c334663aa4

- Exchange are the shuffle tasks
t1 = spark.table('unbucketed1')
t2 = spark.table('unbucketed2')

t1.join(t2, 'key').explain()

#----------------------------------------------------
== Physical Plan ==
*(5) Project [key#10L, value#11, value#15]
+- *(5) SortMergeJoin [key#10L], [key#14L], Inner
:- *(2) Sort [key#10L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#10L, 200)
: +- *(1) Project [key#10L, value#11]
: +- *(1) Filter isnotnull(key#10L)
: +- *(1) FileScan parquet default.unbucketed1[key#10L,value#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
+- *(4) Sort [key#14L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#14L, 200)
+- *(3) Project [key#14L, value#15]
+- *(3) Filter isnotnull(key#14L)
+- *(3) FileScan parquet default.unbucketed2[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
– `conf spark.memory.offHeap.enabled = true`
– `conf spark.memory.offHeap.size = Xgb`
- Column that will be selected/projected when `select,withColumn/drop` is used
- **ColumnPruning** — this is a rule we already mentioned above, it prunes the columns that are not needed to reduce the data volume that will be scanned.
- **CollapseProject** — it combines neighboring Project operators into one.
- **PushProjectionThroughUnion** — this rule will push the Project through both sides of the Union operator.
- The Exchange operator represents shuffle, which is a physical data movement on the cluster. 
- RoundRobinPartitioning: `df.rapartition()`
- SinglePartitioning: `Window.partitionBy()`
- RangePartitioning: `orderBy / sort`
https://observablehq.com/@robinl/understanding-the-spark-ui-by-example-sorting-data
- Dynamically coalescing shuffle partitions
- Combine lot of small partitions into fewer partitions based on defined partition size
- Dynamically switching join strategies
- Broadcast join is preferred in place of Sort Merge join if one of the table size if found to be less than
specified broadcast join table size
- Dynamically optimizing skew joins
- Check average partition size and figure out any skewed partitions and split them into smaller partitions,
so that Sort Merge join can handle the splitted skewed partitions
https://stackoverflow.com/questions/65809909/spark-what-is-the-difference-between-repartition-and-repartitionbyrange https://www.robinlinacre.com/spark_sort/ https://stackoverflow.com/questions/32887595/how-does-spark-achieve-sort-order/32888236#32888236

4. PySpark Syntax

- https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
- https://medium.com/@sergey.ivanchuk/practical-pyspark-window-function-examples-cb5c7e1a3c41

window_spec = Window.partitionBy('department').orderBy('salary')

from pyspark.sql.functions import row_number
df.withColumn("row_number", row_number().over(window_spec)).show()

from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(window_spec)).show()

window_spec_agg = Window.partitionBy("department")
df.withColumn("row",row_number().over(window_spec)) \
.withColumn("avg", F.avg(F.col("salary")).over(window_spec_agg)) \
.withColumn("sum", F.sum(F.col("salary")).over(window_spec_agg)) \
.withColumn("min", F.min(F.col("salary")).over(window_spec_agg)) \
.withColumn("max", F.max(F.col("salary")).over(window_spec_agg)) \
.show()
from pyspark.sql.window import Window
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
num = sc.accumulator(0)
df.withColumn(date_col, F.from_unixtime(F.col("location_at"), "YYYY-MM-dd HH:MM:ss"))
F.year(), F.month(), F.day(), F.hour()
df1.join(df2, df1.col1 == df2.cola, outer/inner/left/right/left_anti)
df.filter(df.age > 3).collect()
df.where(df.age == 2).collect()
li=["OH","CA","DE"]
df.filter(df.state.isin(li)).show()
df.filter(~df.state.isin(li)).show()
df.filter(df.state.startswith("N")).show()
df.filter(df.state.endswith("H")).show()
df.filter(df.state.contains("H")).show()
df2.filter(df2.name.like("%rose%")).show()
df2.filter(df2.name.rlike("(?i)^*rose$")).show()
from pyspark.sql.functions import array_contains
df.filter(array_contains(df.languages,"Java")) \
.show(truncate=False)
count(),mean(),max(),min(),sum(),avg(),agg(),pivot()

df.groupBy("department").sum("salary").show(truncate=False)

df.groupBy("department") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("bonus").alias("sum_bonus"), \
max("bonus").alias("max_bonus") \
) \
.show(truncate=False)


df.groupBy("department") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("bonus").alias("sum_bonus"), \
max("bonus").alias("max_bonus")) \
.where(col("sum_bonus") >= 50000) \
.show(truncate=False)
df.orderBy(F.col('col1').desc())

5. System Design

Airflow
AWS Step

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store