Tuning Spark Jobs on EMR with YARN - Lessons Learnt
Apache Spark is a distributed processing system that can process data at a very large scale. Even though Spark's memory model is optimized to handle large amount of data, it is no magic and there are several settings that can give you most out of your cluster. I am summarizing the tips and gotchas that I have gathered while working in Apache Spark land with help from Cloudera blogs. Some of them are only applicable on AWS EMR and everything is written from Spark with YARN cluster mode perspective.
The properties that requires most frequent tuning are:
- spark.executor.instances (maybe)
There are several other properties that you can tweak but usually the above have the most impact.
Optimizations in EMR and Spark
EMR automatically sets the
spark.executor.coresbased on the slave instance type. While the documentation is not clear on how these are calculated, I have noticed that these are usually set very low to increase the number of executors.
maximizeresourceallocation - EMR calculates the maximum compute and memory resources available for an executor on a node and sets the corresponding values in Spark.
maximizeResourceAllocationproperty can be enabled during the EMR creation step. See AWS EMR documentation to learn the how the values are calculated.
spark.dynamicAllocation.enabled - This is not enabled in Spark by default BUT is automatically enabled on EMR clusters > 4.4.0. EMR sets up a default shuffle service. The purpose of this property is to scale the number of executor instances as needed. Executors are killed and scaled up by YARN. This can be enabled on non-emr cluster by setting
spark.shuffle.service.enabled. See Spark documentation for more info on dynamic allocation
What doesn't work with above optimizations?
You may be wondering why would one need to tune the jobs when all the optimizations are already done at Spark and/or EMR level. Following are some of the reasons:
- Dynamic allocation causes jobs to hang when number of executors are too high and they try communicating to driver. The issue is also documented at https://issues.apache.org/jira/browse/SPARK-16441. A work around could be to set the
spark.dynamicAllocation.maxExecutorsto a lower number. This is problematic for two reasons. Firstly, you will need to figure out this optimal number by a lot of trial and errors and secondly but you could be wasting your cluster resources by an upper limit like this.
maximumresourceallocationin EMR cluster could also potentially waste your cluster resources as it sets the default parallelism to 2 x number of CPU cores available to YARN containers. Spark could be made much more parallel than that.
Dynamic Allocation only scale the number of executors up and down. You would still need to calculate the executor core and memory to fit your data or repartition your data to fit with defaults.
How can I set my own values?
Before you go to this step, I highly suggest using the tools provided by Spark and EMR (if you are using that for cluster). It is very easy to get tangled in details of Spark, YARN and EMR properties. Remember, complex code is not always the right answer.
Spark requests all executors at the load time when
Dynamic Allocation is not enabled. It keeps all executors alive during through out the life of the application.
I usually set my driver memory and driver core same as executor's as the driver could be running as an executor too in
yarn-cluster mode. You may need bigger driver memory if you call functions such as
take more often.
Consider an EMR cluster of 2 r3.4xlarge nodes(excluding master)then:
1 Node = 32 YARN vcores, 122gb memory
YARN vcores is not same as EC2 vCPUs. It's usually double the EC2 vCPUs. See Amazon EMR console for the right value.
Let's reserve 1 core, 1 gb RAM per node for OS and Hadoop Daemons then:
Resource available on each Node: 31 cores and 121 RAM
But the total memory(yarn.nodemanager.resource.memory-mb) that is available on each node for YARN Containers which will ultimately be used to allocate memory to executors is set by EMR. For r3.4xlarge,EMR documentationsays it's ~116gb. YARN UI however indicates it to be 114gb. That shows that there are some overheads that are subtracted as well.
Hence, these are the new values that we will be working with:
1 Node = 31 cores, 114gb
Total cores = 62
Total memory = 228gb
In YARN Mode, your driver will be running on one of the executors.
Let's say we assign 5 cores per executor. This means 5 tasks can be processed by an executor simultaneously then:
Number of cores per executor = 5
Number of Executors per node = 6
How much memory to give to each executor?
executor_memory = memory_available - yarn_container_overhead
Container overhead is controlled by
spark.yarn.executor.memoryOverhead that can be set in Spark config. Its default value is 0.1.
yarn_container_overhead = 0.1
memory_available = total_memory_per_node/num_executors_per_node = 114/6 = 19
yarn_container_overhead = 0.1 * memory_available = 0.1 * 19 = ~2
executor_memory = 19 - 2 = 17gb
What should the default parallelism
Suggested formula -
default_parallelism = number_of_cores * 2 = 11 * 5 * 2 = 110
How much memory each tasks would have?
Task Memory = (spark.executor.memory * spark.memory.fraction * spark.memory.storageFraction)/spark.executor.cores
spark.memory.fraction = 0.6 and
spark.memory.storageFraction = 0.5
task_memory = (17 * 0.6 * 0.5)/5 = 1.02 gb
Final values to set:
- spark.driver.memory = 17g
- spark.driver.cores = 5
- spark.executor.memory = 17g
- spark.executor.cores = 5
- spark.executor.instances = 11 - reduced to allow the same memory and cores to the driver.
- spark.default.parallelism = 110
Few more things:
- Genie - You may need to set driver memory and driver cores during job submission rather than in the code if you are using Genie for job submission.
- Repartitioning the data will increase parallelism but also causes shuffle and spill. More partitions are usually better than less.
- Use tools like Ganglia, Spark UI, Dr. Elephant etc generously for analyzing your jobs' performance.
- Spark UI may show a lower value for executor memory than what you assigned in the application. I haven't figured out a good reasoning for this. Maybe there are more overheads that get subtracted.
- YARN UI doesn't always show the right number of vcores that are being used. See Spark UI Executor page to see how many tasks are being executed in parallel.
- Cache the datasets that are used more than once especially the JSON and Text ones.
- Your executors will sit idle if you do driver intensive calls like take, collect etc often. Avoid those calls if possible.