Spark Jargon for Starters

  1. You still can’t understand the different processes in the Spark Standalone cluster and the parallelism.
  2. You ran the bin\start-slave.sh and found that it spawned the worker, which is actually a JVM. Is worker a JVM process or not?
  3. As per the above link, an executor is a process launched for an application on a worker node that runs tasks. Executor is also a JVM.
  4. Executors are per application. Then what is role of a worker? Does it co-ordinate with the executor and communicate the result back to the driver? or does the driver directly talks to the executor? If so, what is worker’s purpose then?
  5. How to control the number of executors for an application?
  6. Can the tasks be made to run in parallel inside the executor? If so, how to configure the number of threads for an executor?
  7. What is the relation between worker, executors and executor cores ( — total-executor-cores)?
  8. what does it mean to have more workers per node?

DRIVER

EXECUTORS

  • For example, consider a standalone cluster with 5 worker nodes (each node having 8 cores)
  • When i start an application with default settings, Spark will greedily acquire as many cores and executors as are offered by the scheduler. So in the end you will get 5 executors with 8 cores each.
  • Does 2 worker instance mean one worker node with 2 worker processes?
  • Does every worker instance hold an executor for specific application (which manages storage, task) or one worker node holds one executor?
  • BTW, Number of executors in a worker node at a given point of time is entirely depends on work load on the cluster and capability of the node to run how many executors.

APPLICATION EXECUTION FLOW

  1. A standalone application starts and instantiates a SparkContext/SparkSession instance (and it is only then when you can call the application a driver).
  2. The driver program ask for resources to the cluster manager to launch executors.
  3. The cluster manager launches executors.
  4. The driver process runs through the user application. Depending on the actions and transformations over RDDs task are sent to executors.
  5. Executors run the tasks and save the results.
  6. If any worker crashes, its tasks will be sent to different executors to be processed again. In the book “Learning Spark: Lightning-Fast Big Data Analysis” they talk about Spark and Fault Tolerance:
  1. With SparkContext.stop() from the driver or if the main method exits/crashes all the executors will be terminated and the cluster resources will be released by the cluster manager.

Following list captures some recommendations to keep in mind while configuring them:

  • Hadoop/Yarn/OS Deamons: When we run spark application using a cluster manager like Yarn, there’ll be several daemons that’ll run in the background like NameNode, Secondary NameNode, DataNode, JobTracker and TaskTracker. So, while specifying num-executors, we need to make sure that we leave aside enough cores (~1 core per node) for these daemons to run smoothly.
  • Yarn ApplicationMaster (AM): ApplicationMaster is responsible for negotiating resources from the ResourceManager and working with the NodeManagers to execute and monitor the containers and their resource consumption. If we are running spark on yarn, then we need to budget in the resources that AM would need (~1024MB and 1 Executor).
  • HDFS Throughput: HDFS client has trouble with tons of concurrent threads. It was observed that HDFS achieves full write throughput with ~5 tasks per executor . So it’s good to keep the number of cores per executor below that number.
  • MemoryOverhead: Following picture depicts spark-yarn-memory-usage.
Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)
  • Running executors with too much memory often results in excessive garbage collection delays.
  • Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM.

Enough theory.. Let’s go hands-on..

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

First Approach: Tiny executors [One Executor per core]:

--num-executors = 16 (In this approach, we'll assign one executor per core total-cores-in-cluster num-cores-per-node * total-nodes-in-cluster 16 x 10 = 160)
--executor-cores = 1 (one executor per core)
--executor-memory = 4GB (amount of memory per executor mem-per-node/num-executors-per-node 64GB/16 = 4GB)

Second Approach: Fat executors (One Executor per node):

--num-executors = 10 (In this approach, we'll assign one executor per node total-nodes-in-cluster 10)
--executor-cores = 16 (one executor per node means all the cores of the node are assigned to one executor total-cores-in-a-node 16)
--executor-memory = 64GB (amount of memory per executor mem-per-node/num-executors-per-node 64GB/1 = 64GB)

Third Approach: Balance between Fat (vs) Tiny

  • Based on the recommendations mentioned above, Let’s assign 5 core per executors => — executor-cores = 5 (for good HDFS throughput)
  • Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16–1 = 15
  • So, Total available of cores in cluster = 15 x 10 = 150
  • Number of available executors = (total cores / num-cores-per-executor) = 150 / 5 = 30
  • Leaving 1 executor for ApplicationManager => — num-executors = 29
  • Number of executors per node = 30/10 = 3
  • Memory per executor = 64GB / 3 = 21GB
  • Counting off heap overhead = 7% of 21GB = 3GB. So, actual — executor-memory = 21–3 = 18GB
  • Overhead : 1.4GB
  • Executor memory: 14GB
  • Reserved memory : 300 MB
  • Usable memory: 14–0.3 = 13.7GB
  • User memory: 13.7 * (1–0.6) = 5.48GB
  • Storage/Execution Memory: 13.7–5.48 = 8.22 GB
  • yarn.scheduler.capacity.resource-calculator If you are using YARN as cluster manager, then this is a must change configuration to DominantResourceCalculator to get correct container information on YARN UI
  • org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator only uses Memory and ignores the cores requested for!
  • org.apache.hadoop.yarn.util.resource.DominantResourceCalculator uses Dominant-resource to compare multi-dimensional resources such as Memory, CPU etc.

Conclusion:

  • Couple of recommendations to keep in mind which configuring these params for a spark-application like:
  • Budget in the resources that Yarn’s Application Manager would need
  • How we should spare some cores for Hadoop/Yarn/OS deamon processes
  • Learnt about spark-yarn-memory-usage
  • Also, checked out and analyzed three different approaches to configure these params:
  1. Tiny Executors — One Executor per Core
  2. Fat Executors — One executor per Node
  3. Recommended approach — Right balance between Tiny (Vs) Fat coupled with the recommendations.

--

--

--

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Simple solution structure for Azure Functions & Table Storage

UPDATE FOR MINERS❗️

How to Use CURL to Send API Requests?

Installing JupyterLab

A Close-Up Look into Alibaba’s New Generation of Database Technologies

Implementing hierarchical categories in Rail

Most asked questions for writers in the software industry

Glip Compliance Exports API for eDiscovery, GDPR and Analytics

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Mageswaran D

Mageswaran D

A simple guy in pursuit of of AI and Deep Learning with Big Data tools :) @ https://www.linkedin.com/in/mageswaran1989/

More from Medium

Leet Code Problem : Reserve consecutive available seats : Using Snowflake and pyspark

Spark Structured Streaming Simplified

Packaging PySpark application using pex and whl.

Apache Spark 3.0 Exciting Capabilities