RevoScaleR package of Microsoft R Server provides functionalities for both High Performance Analytics (HPA) and High Performance Computing (HPC). HPA provides ‘big data’ capabilities by running distributed analytics across cores and nodes. HPC capabilities allows distributed execution of any R function, and deliver the results back. Most of functionalities of RevoScaleR package are HPA, except rxExec which is HPC.
Some Basic Spark Knowledge
In Spark, the process unit is executor and executor will consume two resources, cores and memory, to execute jobs. Cores control the task parallelization, while memory is used for job execution and caching. In this blog, I will focus on YARN, but ideas can be applied to Mesos accordingly.
Spark acquires executors from resource manager (YARN), which are processes that run computations and store data.
Let’s say one executor is configured to use C cores(spark.executor.cores), and each task is configured to use T cores(spark.task.cpus), then the amount of execution running in parallel is C / T. In most of the case, we don’t change value T, which default is 1.
Executor is basically a JVM process, the on-heap memory is controlled by spark.executor.memory. Executor will also use off heap-memory, and the size is determined by spark.yarn.executor.memoryOverhead. The sum of both on-heap and off-heap is total memory that one executor will request from resource manager YARN.
Configure Spark in RxSpark
There’re 4 parameters in RxSpark function that controls the Spark executors, cores and memory,
Max number of executors Spark will request. The real executor number is determined by how many available resources YARN has.
Number of cores to use per each executor process.
Amount of on-heap memory to be allocated per executor process.
Amount of max additional(off-heap) memory that can be used per executor.
Remember, YARN itself also has CPU and memory configuration.
Amount of physical memory, in MB, that can be allocated for containers.
The maximum allocation for every container request at the RM, in MBs. Memory requests higher than this won’t take effect, and will get capped to this value.
Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of physical cores used by YARN containers.
The maximum allocation for every container request at the RM, in terms of virtual CPU cores. Requests higher than this won’t take effect, and will get capped to this value.
To properly create RxSpark compute context, we need to make sure,
- Memory constraints
[yarn.nodemanager.resource.memory-mb] ≥ [yarn.scheduler.maximum-allocation-mb] ≥ [executorMem + executorOverheadMem]
- CPU constraints
[yarn.nodemanager.resource.cpu-vcores] ≥ [yarn.scheduler.maximum-allocation-vcores] ≥ [executorCores]
General Spark Configuration Guideline
There’re many factors to consider when configuring memory settings for RxSpark compute context, e.g. the data set, RevoScaleR algorithms, available YARN resources.
Usually, use more cores can achieve better task parallelization and performance. Assuming a Spark app has E executors, C cores per executor and T cores usage per task, then number of parallel tasks is E × C / T.
Executor is created within node, not across multi nodes, so understadning available resources on one node is very important. It’s not always true that the more cores Spark uses the better performance it can gain, because task will also consume memory at execution, not enough memory will slow down performance and even result in OOM failure.
Executor on-heap memory is used for many purpose, e.g. shuffling data, unrolling data, task execution, storage caching. So increase on-heap memory will help on both performance and OOM issue.
The off-heap memory is used by R process and RevoScaleR analytics engine process. Increase off-heap memory may not help on performance, because R and RevoScaleR analytics engine consume memory on demand. However executor and its process tree will be killed if the memory consumed by R and RevoScaleR analytics engine exceeds assigned off-heap memory. So increase off-heap memory will help on mitigating off-heap OOM issue.
More cores with few memory will fail the job, few cores with more memory will waste resources, a balance is very important. The default memory settings of RxSpark are,
spark.executorCores = 2
spark.executorMem = “4g”
spark.executorOverheadMem = “4g”
We can see generally for each task running in executor, a 2g on-heap memory and 2g off-heap memory is recommended, but this is not absolute.
HPA Guideline for RxSpark
HPA functionalities (rxLinMod, rxLogit, rxDTree, …) of RevoScaleR provide “big data” analytic capabilities. It runs distribute job across executor cores on cluster nodes as tasks, uses on-heap memory for data shuffling and caching, and off-heap memory for execution as the real data analytic is running in RevoScaleR analytics engine process.
To tune Spark memory for HPA with fixed cores and memory usage, we can consider,
- Use small executor and increase number of executors,
This is to reduce cores and memory usage per executor but increase executor numbers.Pros,
a) With fewer cores, parallel tasks in executor will also reduce. So this better task isolation will help reduce the memory usage impact from other tasks within same executor.
b) Data caching within one executor is small, so rebuilding cost is low, if executor get lost.
c) Smaller executor can help maximum the YARN resources usage, if available resources on nodes are different.
a) High cost on data shuffling, becuase tasks are in different executors where data can not be shared.
b) Small on-heap memory setting may result in OOM error when the input data partition is big.
- Use big executor and decrease number of executors,
This is to increase cores and memory usage per executor, but reduce executor numbers.Pros,
a) Low cost on data shuffling, as the more tasks in one executor, the more data can be shared.
b) With more on-heap memory, OOM error rate will decrease.Cons,
a) Data rebuilding cost is high, if executor get lost.
b) May not fully use YARN resources on each node.
HPC Guideline for RxSpark
HPC(rxExec) function in ScaleR package is quite different from HPA. It uses Spark just for task scheduling, there’s no data unrolling, caching, so doesn’t need too much on-heap memory.
The real computation of user defined function running under R process uses off-heap memory, and the number of parallel tasks is controlled by total cores and number tasks per core(E × C / T). So more cores and more off-heap memory should achieve a better performance.