Apache Spark — Interview Questions

Dive into this blog to get pointers to answer these kind of questions and more!
  • Distributed Computing Framework
  • Distributed SQL Framework with Dataframe and Datasets
  • Distributed Machine Learning
  • Distributed Graph processing
  • Distributed Stream Processing

1. Basics

- In groupByKey the values are grouped but not reduced i.e no operation done on the grouped data
- Whereas in reduceBy, the data is grouped locally, merged locally and then with other partitions
  • Common symptoms of excessive GC in Spark are?
- Slowness of application
- Executor heartbeat timeout
- GC overhead limit exceeded error

2. Intermediate

  • What are types of joins available in Spark?
    - Sort Merge Join
    - Broadcast hash join
    - Hash join
  • What is partitioning in Spark?
- https://kontext.tech/column/spark/296/data-partitioning-in-spark-pyspark-in-depth-walkthrough
- Data is split into chunks. Chunks are defined by file size or number of records or key used for the split
- HDFS vs S3 partitioning : While HDFS has its own partition size config defaults to 128MB, AWS S3 depends on the size of the partition defined in the config
- Shuffle partition : Number of partitions created while doing shuffle exchanges. Default is 200 (spark.sql.shuffle.partition)
- Range Partition
- Hash partition
RDD -> `spark.default.parallelism` and DataFrame -> `spark.sql.shuffle.partitions`
- Both helps in filtering the data while reading by scanning only the necessary files for downstream SQL tasks
- Partitioningby column is good but multi level partitioning will lead to many small files on cardinal columns
- Bucketing on cardinal columns will allows as to split the data to specified number of buckets
- With bucket we can specify the number of buckets we needed
- For each partition there will specified number buckets i.e Number of partition * Number of Buckets
- Number of buckets = Size of data / 128MB
- Config: spark.sql.sources.bucketing.enabled
```
df.write\
.bucketBy(16, 'key') \
.sortBy('value') \
.saveAsTable('bucketed', format='parquet')

t2 = spark.table('bucketed')
t3 = spark.table('bucketed')

# bucketed - bucketed join.
# Both sides have the same bucketing, and no shuffles are needed.
t3.join(t2, 'key').explain()

#----------------------------------------------------------------------
== Physical Plan ==
*(3) Project [key#14L, value#15, value#30]
+- *(3) SortMergeJoin [key#14L], [key#29L], Inner
:- *(1) Sort [key#14L ASC NULLS FIRST], false, 0
: +- *(1) Project [key#14L, value#15]
: +- *(1) Filter isnotnull(key#14L)
: +- *(1) FileScan parquet default.bucketed[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
+- *(2) Sort [key#29L ASC NULLS FIRST], false, 0
+- *(2) Project [key#29L, value#30]
+- *(2) Filter isnotnull(key#29L)
+- *(2) FileScan parquet default.bucketed[key#29L,value#30] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16

```
While scanning parquet files, it's best to filter out the columns that are not needed and keep only the ones needed for the query. I.e “filter early and often”
  • How Spark engine makes the job run faster?
- More number of Map jobs
- Less Disk I/O
  • What does trigger Spark OOM ?
- Driver OOM : collect() & broadcast()
- Collect operation
- Broadcast join: When drivers tries to read the entire table on to driver and broadcast to the cluster
- Executor OOM
- Incorrect usage of Spark
- High concurrency
- Each task read takes 128MB data for each HDFS parition
- Inefficient queries
- Incorrect configuration
- External shuffle service on node manager
  • How coalesce works internally? How is it able to reduce 100 partitions to 10? Whats the down side of it?
  • Custom partitioning with a user defined partitioner?
  • What happens if one of the node where data cached goes down ?
- Depends on the data dependency, if one to one relation then one of other nodes can recalculate the missing data part
- If one to all dependency encountered, then as per Spark data redundancy policy the the whole stage will be re computed
  • Types of filter?
- PushDownPredicates
- CombineFilters
- InferFiltersFromConstraints : inner joins -> filter on column which is not null
- PruneFilters

3. Advanced

  • What is Sort merge join?
  • Can we use broadcast variable in UDF? how to pass dictionary to UDF?
- Make sure the data doesn’t have duplicates
- No null values on joining column (https://blog.clairvoyantsoft.com/optimize-the-skew-in-spark-e523c6ee18ac)
- When joining a small table with a very big table, what care should be taken? How does the partition of tables should be handled?
- Key salting (https://itnext.io/handling-data-skew-in-apache-spark-9f56343e58e8)
# Adding random values to one side of the join
df_big = df_big.withColumn('city', F.concat(df['city'], F.lit('_'), F.lit(F.floor(F.rand(seed=17) * 5) + 1)))# Exploding corresponding values in other table to match the new values of initial table
df_medium = df_medium.withColumn('city_exploded', F.explode(F.array([F.lit(i) for i in range(1,6)])))
df_medium = df_medium.withColumn('city_exploded', F.concat(df_medium['city'], F.lit('_'), df_medium['city_exploded'])). \
drop('city').withColumnRenamed('city_exploded', 'city')# joining
df_join = df_big.join(df_medium, on=['city'], how='inner')
  • Find the data distribution with:
data_distribution = df.rdd.glom().map(len).collect()
While Spark partition is done to chunk the data for processing by Spark executors, Hive partition is for hierarchical storage of data based on column for fast analysis
  • How Executor memory is managed in Spark ?
- 1.3GB : Input Spark Executor memory
- 300 MB : Reserved Memory
- 25 % of (1.3GB - 300MB) = 250MB User memory : To store data objects and data structures
- 75% of of (1.3GB - 300MB) = 750MB Spark Memory Fraction
- Storage Memory : Cache memory
- Execution Memory: Temp memory Eg. Aggregation results
- Yarn Memory Overhead : 10% of Executor memory `spark.yarn.executor.memoryOverhead`
- YM is used to store the runtime class objects and strings
- High Concurrency
- When number of cores are greater than 5, the meta data handling will shoot up leaving no memory while processing the data
- Executor getting Big Partitions due to data skew, which then takes lot of time to process or caching can go wrong
- There are situations where each of the above pools of memory, namely execution and storage, may borrow from each other if the other pool is free. Also, storage memory can be evicted to a limit if it has borrowed memory from execution. However, without going into those complexities, we can configure our program such that our cached data which fits in storage memory should not cause a problem for execution.
- If we don’t want all our cached data to sit in memory, then we can configure “spark.memory.storageFraction” to a lower value so that extra data would get evicted and execution would not face memory pressure.
  • How do you optimize a job?
- Check the input data size and output data size and correlate to operating  cluster memory and adjust the memory accordingly
- Check Input partition size, output partition size and number of partitions along with shuffle partition and decide number of cores
- Check for disk memory spills during stage execution to see for memory issues
- Check number of executors used for given cluster size, and optimize accordingly
- Available cluster memory and memory in use by the application/job
- Check average run time of all stages in the job, and try to identify any of the stages that takes lot of time to execute compared others, which can lead to data skew
- Check whether the table is partitioned by column or not (bucketing)?
  • What is a physical plan and logical plan?
- https://blog.clairvoyantsoft.com/spark-logical-and-physical-plans-469a0c061d9e
- https://towardsdatascience.com/should-i-repartition-836f7842298c TODO read
- df.explain(true) : Parsed -> Analysed -> Optimized -> Physical
- Parsed/Unresolved Logical Plan : Syntatically correct
- Analysed/Resolved Logical Plan : Semantic Analysis with Spark catalog(meta store)
- Optimized Logical Plan : WholeCodeGen/Order of execution/filter clause prioritizing
explain(mode=”simple”) shows physical plan.
explain(mode=”extended”) presents physical and logical plans.
explain(mode=”codegen”) shows the java code planned to be executed.
explain(mode=”cost”) presents the optimized logical plan and related statistics (if they exist).
explain(mode=”formatted”) shows a split output created by an optimized physical plan outline, and a section of every node detail.

- https://towardsdatascience.com/mastering-query-plans-in-spark-3-0-f4c334663aa4

- Exchange are the shuffle tasks
t1 = spark.table('unbucketed1')
t2 = spark.table('unbucketed2')

t1.join(t2, 'key').explain()

#----------------------------------------------------
== Physical Plan ==
*(5) Project [key#10L, value#11, value#15]
+- *(5) SortMergeJoin [key#10L], [key#14L], Inner
:- *(2) Sort [key#10L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#10L, 200)
: +- *(1) Project [key#10L, value#11]
: +- *(1) Filter isnotnull(key#10L)
: +- *(1) FileScan parquet default.unbucketed1[key#10L,value#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>
+- *(4) Sort [key#14L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#14L, 200)
+- *(3) Project [key#14L, value#15]
+- *(3) Filter isnotnull(key#14L)
+- *(3) FileScan parquet default.unbucketed2[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,value:double>, SelectedBucketsCount: 16 out of 16
  • Storing data off-heap: Be careful when using off-heap storage as it does not impact on-heap memory size i.e.
    it won’t shrink heap memory. So to define an overall memory limit, assign a smaller heap size.
– `conf spark.memory.offHeap.enabled = true`
– `conf spark.memory.offHeap.size = Xgb`
- Column that will be selected/projected when `select,withColumn/drop` is used
- **ColumnPruning** — this is a rule we already mentioned above, it prunes the columns that are not needed to reduce the data volume that will be scanned.
- **CollapseProject** — it combines neighboring Project operators into one.
- **PushProjectionThroughUnion** — this rule will push the Project through both sides of the Union operator.
  • What is Exchange?
- The Exchange operator represents shuffle, which is a physical data movement on the cluster. 
- RoundRobinPartitioning: `df.rapartition()`
- SinglePartitioning: `Window.partitionBy()`
- RangePartitioning: `orderBy / sort`
https://observablehq.com/@robinl/understanding-the-spark-ui-by-example-sorting-data
  • Adaptive Query Execution (AQE) is query re-optimization that occurs during query execution based on runtime statistics. AQE in Spark 3.0 includes 3 main features:
- Dynamically coalescing shuffle partitions
- Combine lot of small partitions into fewer partitions based on defined partition size
- Dynamically switching join strategies
- Broadcast join is preferred in place of Sort Merge join if one of the table size if found to be less than
specified broadcast join table size
- Dynamically optimizing skew joins
- Check average partition size and figure out any skewed partitions and split them into smaller partitions,
so that Sort Merge join can handle the splitted skewed partitions
https://stackoverflow.com/questions/65809909/spark-what-is-the-difference-between-repartition-and-repartitionbyrange https://www.robinlinacre.com/spark_sort/ https://stackoverflow.com/questions/32887595/how-does-spark-achieve-sort-order/32888236#32888236

4. PySpark Syntax

- https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
- https://medium.com/@sergey.ivanchuk/practical-pyspark-window-function-examples-cb5c7e1a3c41

window_spec = Window.partitionBy('department').orderBy('salary')

from pyspark.sql.functions import row_number
df.withColumn("row_number", row_number().over(window_spec)).show()

from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(window_spec)).show()

window_spec_agg = Window.partitionBy("department")
df.withColumn("row",row_number().over(window_spec)) \
.withColumn("avg", F.avg(F.col("salary")).over(window_spec_agg)) \
.withColumn("sum", F.sum(F.col("salary")).over(window_spec_agg)) \
.withColumn("min", F.min(F.col("salary")).over(window_spec_agg)) \
.withColumn("max", F.max(F.col("salary")).over(window_spec_agg)) \
.show()
  • Moving average
from pyspark.sql.window import Window
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))
  • Broadcast & Accumulator
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
num = sc.accumulator(0)
df.withColumn(date_col, F.from_unixtime(F.col("location_at"), "YYYY-MM-dd HH:MM:ss"))
F.year(), F.month(), F.day(), F.hour()
  • Joins
df1.join(df2, df1.col1 == df2.cola, outer/inner/left/right/left_anti)
  • Filters
df.filter(df.age > 3).collect()
df.where(df.age == 2).collect()
li=["OH","CA","DE"]
df.filter(df.state.isin(li)).show()
df.filter(~df.state.isin(li)).show()
df.filter(df.state.startswith("N")).show()
df.filter(df.state.endswith("H")).show()
df.filter(df.state.contains("H")).show()
df2.filter(df2.name.like("%rose%")).show()
df2.filter(df2.name.rlike("(?i)^*rose$")).show()
from pyspark.sql.functions import array_contains
df.filter(array_contains(df.languages,"Java")) \
.show(truncate=False)
  • GroupBy
count(),mean(),max(),min(),sum(),avg(),agg(),pivot()

df.groupBy("department").sum("salary").show(truncate=False)

df.groupBy("department") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("bonus").alias("sum_bonus"), \
max("bonus").alias("max_bonus") \
) \
.show(truncate=False)


df.groupBy("department") \
.agg(sum("salary").alias("sum_salary"), \
avg("salary").alias("avg_salary"), \
sum("bonus").alias("sum_bonus"), \
max("bonus").alias("max_bonus")) \
.where(col("sum_bonus") >= 50000) \
.show(truncate=False)
  • Sort
df.orderBy(F.col('col1').desc())

5. System Design

  • Can you explain how RDD sorting works in Apache Spark? How different it is from traditional in-memory sort? [SO]
  • How do you schedule Spark jobs?
Airflow
AWS Step

--

--

--

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Introduction

Custom Object Detection By Yolov3 Darknet

Table of Minimum Requirements — EnggtecH

Understanding basics Unix/Linux..

Kubernetes Tip: How To Gracefully Handle Pod Deletion?

What Is Low-Code and How Does It Work in 2020?

Designing systems for High Availability

OFTW#1 Everyday things Part 1

Key Metrics to Track in DevOps (Part 1)

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Mageswaran D

Mageswaran D

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

More from Medium

Big Data Processing: Most Time-Consuming Task

Things you should know about Spark: part 1 the basics

Map or Reduce or Both for a given query/job

Map Reduce Flow

Understanding Medallion Architecture: An example with Delta Lake and Apache Spark