Apache Spark Performance Tuning and Optimizations for Big Datasets
RDD Basics:
Dataset Basics…
Contents
- RDD
- Partitions
- Joins
- Serialization
- UDF
- Analyse Execution plan
- Data Skew
- Cache
- Storage
- JVM
- Monitoring
- Executor
- Memory
- Streaming
When I started Apache Spark learning almost 3 years back, it was more of setting up on local machine and run the code on single machine. Later write Scala code, build and run with spark-submit. Then came PySpark and Jupyter notebook.
Most of new comers found it to be easy to get started with PySpark but feel difficult when dealing production use cases and/or projects. The same configurations, for the same data set may not work alike across pipeline modules. For example compaction needs more nodes with less compute power and almost independent of memory as it simply packs the data, where as an Access stage (algorithm stage) needs more memory and compute power.
Team needs to have a good understanding on the tuning parameters of Apache Spark for given bottleneck scenario.
I always wanted to put up a online material for quick reference for me and my team and here it is!
Well there are 100s of blogs that talks on the topic, this is a quick reference cheat sheet for my day to day work needs, consolidated from different sources, so this will get updated as I come across new stuff that aids my work :)
For those who wanted to have a understanding on the Spark internals hit this link:
1. RDD
- Minimize shuffles on join() by either broadcasting the smaller collection or by hash partitioning both RDDs by keys.
- Use narrow transformations instead of the wide ones as much as possible. In narrow transformations (e.g., map()and filter()), the data required to be processed resides on one partition, whereas in wide transformation (e.g, groupByKey(), reduceByKey(), and join()), the data to be processed resides on multiple partitions and so needs to be shuffled.
- Avoid using GroupByKey() for associative reductive operations. Always use the ReduceByKey() instead. With the ReduceByKey, Spark combines output with common keys on each partition before shuffling the data.
- To join two grouped datasets and keep them grouped, use cogroup() rather than the flatMap-join-groupBy pattern.
- To repartition the data and have you records sorted in each partition, use repartitionAndSortWithinPartitions() instead of calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
a) groupByKey
groupByKey
can cause out of disk problems as data is sent over the network and collected on the reduce workers.
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
Eg:
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
// b.groupByKey().mapValues(_.mkString("")).collect()
// Array[(Int, String)] = Array((3,dogcatowlgnuant))
b) reduceByKey
Data are combined at each partition, only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
//// b.groupByKey().mapValues(_.mkString("")).collect()
// Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
c) aggregateByKey:
same as reduceByKey, which takes an initial value.
3 parameters as input
i. initial value
ii. Combiner logic
iii. sequence op logic
Eg:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
//ouput: Aggregate By Key sum Results bar -> 3 foo -> 5
d) combineByKey:
3 parameters as input
Initial value: unlike aggregateByKey, need not pass constant always, we can pass a function that will return a new value. merging function combine function
Eg:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
2. Partitions — That Determine the parallelism
number of tasks = (number of stages) * (number of partitions)
- A good lower bound for number of partitions is to have at least 2 * (# cores in cluster). For example, if you have 100 cores in the clusters, a good starting point for number of partitions is around 200. From there on, you can continue to keep increasing the number of partitions until you can see that you get a good balance of concurrency and task execution
time. - A powerful way to control Spark shuffles is to partition your data intelligently. Partitioning on the right column (or set of columns) helps to balance the amount of data that has to be mapped across the cluster network in order to perform actions. Partitioning on a unique ID is generally a good strategy, but don’t partition on sparsely filled columns or columns that over-represent particular values.
- Avoid spills and utilize all the cores
- It is recommended to have 2–3 tasks per CPU core in your cluster.
- Even though Spark attempts to infer a sensible number of partitions for your collections, sometimes you may need to tune the level of parallelism by optimizing the number of partitions.
- RDDs produced by
textFile
orjsonFile
are partitioned based on by the underlying MapReduce InputFormat that’s used. HDFS input RDDs are typically partitioned based on the number of HDFS blocks the data resides on. - RDDs that are derived from other collections simply inherit the number of partitions from the parent collection.
- Minimize the number of shuffles by using partitioning mechanisms that would allow the program to know that a set of keys will appear together on the same node.
- Hash partitioning: the keys that have the same hash value will appear on the same node.
- Range partitioning: the keys within same range appear on the same node.
- Spark operations that involves shuffling data by key benefit from partitioning: cogroup(), groupWith(), join(), groupByKey(), combineByKey(), reduceByKey(), and lookup()).
- Repartitioning (repartition()) is an expensive task because it moves the data around, but you can use coalesce() instead only of you are decreasing the number of partitions.
- If a collection is used once there is no point in repartitioning it, but repartitioning is useful only if it is used multiple times in key-oriented operations.
a) At input level
spark.default.parallelism(don’t use)
spark.sql.files.maxPartitionBytes
Go with default partition size 128MB , unless you wanted to
- Increase the parallelism
- Have heavily nested/repeated data
- Generating data — i.e Explode data
- Source structure is not optimal
- UDFs
- Confgiure the right partition size and increase the processing speed
spark.sql.files.maxPartitionBytes
Default: 134217728 (128 MB)
The maximum number of bytes to pack into a single partition when reading files.
For example if we wanted to write data of size 14.7GB
And we wanted to store as 1500MB / 1.5GB, then the number of partitions will be 10.
So, if we 96 cores in our cluster, then we would be using only 10 out of 96 avaible cores, leaving 86 cores idle.
Ok if we wanted to use all 96 cores then repartition the data to 96 partitions , that is 14.7/96 cores = ~ 160 MB for each parition size.
spark.sql.files.maxPartitionBytes = 160 * 1024 * 1024
So now we use all 96 cores
b) At shuffle level
A must read blog on shuffle:
spark.sql.shuffle.partitions
Default: 200
Configures the number of partitions to use when shuffling data for joins or aggregations.
Say if you have data size of 54GB
Number of intermediate partitions = 54000 MB / 200 parts = 270 MB / partition
To avoid spills we can change the number of shuffle partitions as follows :
54 GB / 100 MB = 540 parts, where each parition gonna have 100MB of data
540 parts / 96 cores = 5.625 parttitions or part files / core
96 cores * 5 = 480 parts
spark.sql.shuffle.partitions = 480
Shuffle Memory
spark.memory.offHeap.enable = true
spark.memory.ofHeap.size = 3g
Tune Shuffle file buffer
Disk access is slower than memory access so we can amortize disk I/O cost by doing buffered read/write.
#Size of the in-memory buffer for each shuffle file output stream. #These buffers reduce the number of disk seeks and system calls made #in creating intermediate shuffle files. [Shuffle behavior]
spark.shuffle.file.buffer = 1 MB
spark.unsafe.sorter.spill.reader.buffer.size = 1 MB
Optimize spill files merging [Spark-20014]
Use mergeSpillsWithFileStream method by turning off transfer to and using buffered file read/write to improve the io throughput.
spark.file.transferTo = false
spark.shuffle.file.buffer = 1 MB
spark.shuffle.unsafe.file.ouput.buffer = 5 MB
Tune compression block size
Default compression block is 32 kb which is not optimal for large datasets. If you go to the slide you will find up to 20% reduction of shuffle/spill file size by increasing block size.
#Block size used in LZ4 compression, in the case when LZ4 #compression codec is used. Lowering this block size will also lower #shuffle memory usage when LZ4 is used. [Compression and Serialization]
spark.io.compression.lz4.blockSize = 512KB#Note that tha default compression code is LZ4 you could change #using
spark.io.compression.codec
Cache Index files on Shuffle Server
The issue is that for each shuffle fetch, we reopen the same index file again and read it. It would be much efficient, if we can avoid opening the same file multiple times and cache the data. We can use an LRU cache to save the index file information. This way we can also limit the number of entries in the cache so that we don’t blow up the memory indefinitely. [Spark-15074]
#Cache entries limited to the specified memory footprint.
spark.shuffle.service.index.cache.size = 2048
Configurable shuffle registration timeout and retry
This is especially recommended for a big cluster (Eg. more than 50 nodes) when is more likely to happens a node failure.
spark.shuffle.registration.timeout = 2m
spark.shuffle.registration.maxAttempst = 5
c) At output level
- Coalesce to shrink number of partitions
- Repartition to increase the number of partitions
In the literature, it’s often mentioned that coalesce should be preferred over repartition to reduce the number of partitions because it avoids a shuffle step in some cases.
But coalesce has some limitations(outside the scope of this article): it cannot increase the number of partitions and may generate skew partitions.
Here is one case where a repartition should be preferred.
In this case, we filter most of a dataset.
df.doSomething().coalesce(10).write(…)
The good point about *coalesce* is that it avoids a shuffle. However, all the computation is done by only 10 tasks.
This is due to the fact that **the number of tasks depends on the number of partitions of the output of the stage**, each one computing a big bunch of data. So a maximum of 10 nodes will perform the computation.
df.doSomething().repartition(10).write(…)
Using repartition we can see that the total duration is way shorter (a few seconds instead of 31 minutes). The filtering is done by 200 tasks, each one working on a small subset of data. It’s also way smoother from a memory point a view, as we can see in the graph below.
- Some other possible ways…
df.coalesce(n).write…
df.repartition(n).write…
df.repartition(n, [colA, …]).write…
spark.sql.shuffle.partitions(n)
df.localCheckpoint(…).repartition(n).write…
df.localCheckpoint(…).coalesce(n).write…
- When saving DataFrames to disk (i.e. in Parquet format), pay particular attention to the partition sizes. Spark will output one file per task (i.e. one file per partition) on writes, and will read at least one file in a task on reads. The issue here is that if the cluster/setup in which the DataFrame was saved had a larger amount of aggregate memory, and thus could handle larger partition sizes without error, a smaller cluster/setup may have trouble reading this saved DataFrame. Unfortunately, the partition sizes become a leaky abstraction even when saved to disk. A possible use case here is a large preprocessing cluster, and a smaller, leaner serving cluster. In this situation, a remedy would be to repartition the DataFrame into a larger number of partitions before writing.
- If you aware of record size or number of records then you can control the number of parition with maxRecordsPerFile
df.write.option("maxRecordsPerFile", 10000).save(....)
d) Hive Partitions
- Hive Bucketing, consider when there is a high hashable column values
spark.sql.sources.bucketing.enabled=True
hive.enforce.bucketing=false
hive.enforce.sorting=false
df.write
.bucketBy(numBuckets, “col1”, …)
.sortBy(“col1”, …)
.saveAsTable(“/path/table_name”)
create table table_name(col1 INT, …)
using parquet
CLUSTERED By (col1, …)
SORTED BY(col1, …)
INTO `numBuckets` BUCKETS
3. Joins
a) SortMerge Join Both sides are lrage
b) Broadcast DataFrame Join when one side is small
leftDF.join(broadcast(rightDF))
- Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below spark.sql.autoBroadcastJoinThreshold.
spark.sql.autoBroadcastJoinThreshold
Default: 10485760 (10 MB)
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command
ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
has been run.
- Broadcast allows to send a read-only variable cached on each node once, rather than sending a copy for all tasks.
- Automatic If: (one side < spark.sql.autoBroadcastJoinThreshold) (default 10m)
- Risks
Not Enough Driver Memory
DF > spark.driver.maxResultSize
DF > Single Executor Available Working Memory
When broadcast join is used, the entire table is read by the cluster nodes, pulled into driver memory and then send to each of the nodes as a full copy of the table to be cached in the node memory.
Thats why this makes majot impact if big files are dealt.
Requires to use Hive and its metastore for auto detection of table size
c) Join
- When you’re joining together two datasets where one is smaller than the other, put the larger dataset on the “Left”
- When Spark shuffles data for the join, it keeps the data you specified on the left static on the executors and transfers the data you designed on the right between the executors. If the data that’s on the right, that’s being transferred, is larger, then the serialization and transfer of the data will take longer.
val joinedDF = largerDF.leftJoin(smallerDF, largerDF("id") === smallerDF("some_id"))
d) Null values
Consider we have a tables with following counts..
- Now consider sample query where we are joining on highly null columns
select * from order_tbl orders left join customer_tbl customer
on orders.customer_id = customer.customer_id
left join delivery_tbl delivery
on orders.delivery_id = delivery.delivery_id
- In above query, 199/200 tasks would complete quite fast and then probably gets stuck on the last task.
- Reason for the above behavior, let’s say that we have asked Spark to join two DataFrames — TABLE1 and TABLE2. When Spark executes this code, internally it performs the default Shuffle Hash Join (Exchange hashpartitioning).
+- Exchange hashpartitioning(delivery_id#22L, 400)
+- *(6) Project [delivery_id#22L]
- In this process, Spark hashes the join column and sorts it. And then it tries to keep the records with same hashes in both partitions on the same executor hence all the null values of the table will go to one executor and spark gets into a continuous loop of shuffling and garbage collection with no success.
- Solution is split the tables into two parts, one with null values(i.e all the values) and without null values. Use table without null values for join and then union them with full data.This way null values won’t participate in the join
CREATE TABLE order_tbl_customer_id_not_null as select * from order_tbl where customer_id is not null;
CREATE TABLE order_tbl_customer_id_null as select * from order_tbl where customer_id is null;
--Rewrite queryselect orders.customer_id from order_tbl_customer_id_not_null orders
left join customer_tbl customer
on orders.customer_id = customer.customer_id
union allselect ord.customer_id from order_tbl_customer_id_null ord;
4. Serialization
It is the process of converting the in-memory object to another format that can be used to store in a file or send over the network.
Two options
- Java serialization
- Kryo serialization
“spark.serializer” to “org.apache.spark.serializer.KyroSerializer”
val conf = new SparkConf().setAppName(…).setMaster(…)
.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
.set(“spark.kryoserializer.buffer.max”, “128m”)
.set(“spark.kryoserializer.buffer”, “64m”)
.registerKryoClasses(
Array(classOf[ArrayBuffer[String]], classOf[ListBuffer[String]])
)
5. UDF
- Avoid User defined Function UDFs and User Defined Aggregate Funtions UDAF
- Prefer in build SQL Functions where ever possible
- User written UDFs cannot use Tungsten
- Use `org.apache.spark.sql.functions`
- Use PandasUDFs
- https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
- https://fr.slideshare.net/cfregly/advanced-apache-spark-meetup-project-tungsten-nov-12-2015
- Check this medium post for Scala UDF vs PySpark UDF and their notebook here.
6. Analyse Execution plan
- Stages are created when there is a need for shuffle for operations like `join` or `groupBy`
- Thought the Spark engine does pretty good job of optimizing the DAGs for executions, it is also developer responsibility to keep the number of stages under a reasonable number. This involves good amount of understanding of APIs and SQL query optimization
- We can use the` .explain(true)` command to show the execution plan detailing all the steps (stages) involved for a job. Here is an example:
7. Data Skew
- Common symptoms of data skew are:
Frozen stages and tasks.
Especially with the last couple of tasks in a stage
Low utilization of CPU.
Out of memory errors.
- Data broadcast during join operation can help with minimize the data skew side effects.
- Increase the `spark.sql.autoBroadcastJoinThreshold` for Spark to consider tables of bigger size. Make sure enough memory is available in driver and executors
- Salting
— In a SQL join operation, the join key is changed to redistribute data in an even manner so that processing for a partition does not take more time. This technique is called salting.
— Add Column to each side with random int between 0 and `spark.sql.shuffle.partitions`-1 to both sides
— Add join clause to include join on generated column above
— Drop temp columns from result
# naive way
df.groupBy(“city”, “state”).agg(<f(x)>).orderBy(col.desc)# salting
val saltVal = random(0, spark.conf.get(org...shuffle.partitions) -1)
df.withColumn(“salt”, lit(saltVal))
.groupBy(“city”, “state”, “salt”)
.agg(<f(x)>)
.drop(“salt”)
.orderBy(col.desc)
- To quickly check if everything is ok we review the execution duration of each task and look for heterogeneous process time. If one of the tasks is significantly slower than the others it will extend the overall job duration and waste the resources of the fastest executors.
- Before querying a series of tables, it can be helpful to tell spark to Compute the Statistics of those tables so that the Catalyst Optimizer can come up with a better plan on how to process the tables.
spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS")
In some cases, Spark doesn’t get everything it needs from just the above broad COMPUTE STATISTICS call. It also helps to tell Spark to check specific columns so the Catalyst Optimizer can better check those columns. It’s recommended to COMPUTE STATISTICS for any columns that are involved in filtering and joining.
spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS FOR COLUMNS joinColumn, filterColumn")
- Avoid calling DataFrame/RDD `count` API unless it is absolutely necessary.
8. Cache
- There is no universal answer when choosing what should be cached. Caching an intermediate result can dramatically improve performance and it’s tempting to cache a lot of things. However, due to Spark’s caching strategy (in-memory then swap to disk) the cache can end up in a slightly slower storage. Also, using that storage space for caching purposes means that it’s not available for processing. In the end, caching might cost more than simply reading the DataFrame.
- In the Storage tab of the UI we verify the Fraction Cached and also look at the Size in Memory and Size on Disk distribution.
- When allocating memory on workers, be sure to leave enough memory for other running processes. A JVM can be started with more memory than available, however, it will fail when it approaches the upper bound, leading to “worker lost” errors. Ex. If a machine has 128GB RAM, in reality only 124GB will be available to the OS. Furthermore, several GB will be used by the OS and other processes, so a good upper limit on the Worker memory may be 100GB.
9. Storage
- In AWS S3 asimple renaming actually needs to copy and then delete the original file.
- The first workaround when using Spark with S3 as an output it to use this specific configuration:
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 spark.speculation false
More details here.
Parquet
- Columnar storage format
- Spark can leverage **predicate push down** (i.e. pushing down the filtering closer to the data).
- However, predicate pushdown should be used with extra care. Even if it appears in the execution plan, it will not always be effective. For example, a filter push-down does not work on String and Decimal data types PARQUET-281
10. JVM
Read this blog for more details on garbage collection.
spark.executor.extraJavaOptions = -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:UseG1GC XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20
- When Java evicts old objects to make room for new ones, it needs to trace through all objects and discard the old ones. Thus, the cost of JVM GC is proportional to the number of Java objects.
- Data structures with fewer objects will lower this cost (Array of Int vs. LinkedList).
- Persist objects in serialized form so that there is only one object (byte array) per RDD.
- You can measure GC frequency and time spent by adding
-verbose:gc -XX:+PrintGCDetails
and-XX:+PrintGCTimeStamps
to Java options. - Too many or long lasting GCs implies that there isn’t enough memory left for the execution of GC. For efficient memory usage, you can clean up cached/persisted collections after they are no longer needed.
- Advanced: adjust JVM Young (Eden, Survivor1, Survivor2) and Old Heap spaces.
- When using RDDs in PySpark, make sure to save enough memory on worker nodes for Python processes, as the “executor memory” is only for the JVM.
- Analyse the logs for the memory usage, most likely the problem would be with the data partitions.
- If you have less than 32 GB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight. You can add these options in spark-env.sh.
- https://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning
- EMR : https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-debugging.html
- Tool to analyse the logs: https://gceasy.io/
11. Monitoring spark applications
- Spark includes a configurable metrics system based on the dropwizard.metrics library.
- https://github.com/qubole/sparklens
- https://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/
- https://github.com/hammerlab/grafana-spark-dashboards
- http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/
12. Executor
- Dynamic Executor Allocation is a Spark feature that allows for adding and removing Spark executors dynamically to match the work load.
- Enabling this configuration is totally recommended if you share cluster resources with other teams so your Spark applications only use what it eventually will use. It can scale the number of executors based on workload.
spark.dynamicAllocation.enable = true
spark.dynamicAllocation.executorIdleTimeout = 2m
spark.dynamicAllocation.minExecutors = 1
spark.dynamicAllocation.maxExecutors = 2000
The four parameters are self-describing, maybe more details are needed for the second one. executorIDleTimeout is applied to remove executors correctly.
Better Fetch Failure handling
The number of consecutive stage attempts allowed before a stage is aborted (by default is 4).
spark.stage.maxConsecutiveAttempts = 10
Tune RPC Server threads
Increase RPC server thread to fix out of memory (actually I cannot find more details in the spark official documentation, a good explanation is here)
spark.rpc.io.serverTreads = 64
13. Memory
- Avoid nested structures with a lot of small objects and pointers when possible.
- Consider using numeric IDs or enumeration objects instead of strings for keys.
- Execution memory is used for computation (e.g., shuffles, joins, sorts, and agg).
- Storage memory is used for caching and propagating internal data.
- Jobs that do not use cache can use all space for execution, and avoid disk spills.
- Applications that use caching reserve minimum storage space where the data cannot be evicted by execution requirements.
- Set spark.memory.fraction to determine what fraction of the JVM heap space is used for Spark execution/storage memory. The default is 60%.
- Set JVM flag
-XX:+UseCompressedOops
if you have less than 32 GB of RAM to make pointers be four bytes instead of eight. - Avoid using executors with too much memory as it would delay the JVM garbage collection process.
- Avoid using a lot of very small executors to be able to still benefit from running multiple tasks in a single JVM.
14. Streaming
- Batch Interval and Block Interval
The batch interval refers to the time interval during which the data is collected, buffered by the receiver to be sent to Spark. The receiver sends the data to the executor which manages the data in blocks and each block becomes a partition of the RDD produced during each batch interval. Now, the number of partitions created in each block interval per consumer is
based on the block interval — which is the interval at which data received by Spark Streaming receivers is formed into blocks of data. This interval can be set with this property:
spark.streaming.blockInterval
Default: 200ms
Interval at which data received by Spark Streaming receivers is chunked into blocks of data before storing them in Spark. Minimum recommended - 50 ms. See the performance tuning section in the Spark Streaming programing guide for more details.
So, the number of partitions created per consumer can be calculated with this expression:
number of partitions per consumer = batchInterval / blockInterval
Now, the total number of partitions per job is
number of partitions per application = (#consumers) * (batchInterval / blockInterval)
Misc
Try alternatives for AWS EMR with plain EC2
- https://github.com/nchammas/flintrock
- https://heather.miller.am/blog/launching-a-spark-cluster-part-1.html
- To use a custom python package on worker nodes
To use a custom Python package sitting in the current directory (i.e. the main script imports from this package), it will need to be zipped up and shipped out to the worker nodes before usage.
Given a local package mypackage, the following can be used in the main script to zip up the package and ship it to workers before usage of the package:
# Ship a fresh copy of the `mypackage` package to the Spark workers.
import shutil
dirname = "mypackage"
zipname = dirname + ".zip"
shutil.make_archive(dirname, 'zip', dirname + "/..", dirname)
spark.sparkContext.addPyFile(zipname)
Note: The zip must include the mypackage directory itself, as well as all files within it for addPyFile to work correctly.
This is equivalent to :
zip -r mypackage.zip mypackage.
Error Messages Don’t Mean What They Say
It took quite a while to get used to the fact that Spark complains about one thing, when the problem is really somewhere else.
- “Connection reset by peer” often implies you have skewed data and one particular worker has run out of memory.
- “java.net.SocketTimeoutException: Write timed out” might mean you have set your number of partitions too high, and the filesystem is too slow at handling the number of simultaneous writes Spark is attempting to execute.
- “Total size of serialized results… is bigger than spark.driver.maxResultSize” could mean you’ve set your number of partitions too high and results can’t fit onto a particular worker.
- “Column x is not a member of table y”: You ran half your pipeline just to discover this sql join error. Front-load your run-time execution with validation to avoid having to reverse engineer these errors.
- Sometimes you will get a real out of memory error, but the forensic work will be to understand why: Yes, you can increase the size of your individual workers to make this problem disappear, but before you do that, you should always ask yourself, “is the data well distributed?”
References:
- https://medium.com/teads-engineering/lessons-learned-while-optimizing-spark-aggregation-jobs-f93107f7867f
- https://www.slideshare.net/databricks/apache-spark-coredeep-diveproper-optimization
- https://michalsenkyr.github.io/2018/01/spark-performance
Read List: