rxExecBy Insights on RxSpark Compute Context

rxExecBy is designed to resolve a problem that user has a very big data set, want to partition it into many small partitions, and train models on each partition. This is what we call small data many models. rxExecBy has many features and can run in many different compute contexts, e.g. RxSpark, RxInSqlServer, local. In this blog, I’m going to cover RxSpark compute context and help you understand the details.

Data Preperation

In RxSpark, rxExecBy supports RxTextData, RxXdfData(Composite Set), RxHiveData, RxParquetData and RxOrcData. I’m going to use AirlineDemoSmall.csv from RevoScaleR package, and convert it into all these supported data sources.

# Create myDir in hdfs and copy csv file
source <-system.file("SampleData/AirlineDemoSmall.csv", package="RevoScaleR")
myDir <- "/share/AirlineDemoSmall"
rxHadoopMakeDir(myDir)
rxHadoopCopyFromLocal(source, myDir)

cc <- rxSparkConnect(reset = TRUE)

# Summary of data
hdfsFileSystem <- RxHdfsFileSystem()
csvFile <- file.path(myDir, "AirlineDemoSmall.csv")
textData <- RxTextData(file = csvFile, missingValueString = "M", stringsAsFactors = TRUE, fileSystem = hdfsFileSystem)

rxSummary(~., textData)
#Call:
#rxSummary(formula = ~., data = textData)
#Summary Statistics Results for: ~.
#Data: textData (RxTextData Data Source)
#File name: /share/AirlineDemoSmall/AirlineDemoSmall.csv
#Number of valid observations: 6e+05
#
# Name       Mean     StdDev    Min        Max        ValidObs MissingObs
# ArrDelay   11.31794 40.688536 -86.000000 1490.00000 582628   17372
# CRSDepTime 13.48227  4.697566   0.016667   23.98333 600000       0
#
#Category Counts for DayOfWeek
#Number of categories: 7
#Number of valid observations: 6e+05
#Number of missing observations: 0
#
# DayOfWeek Counts
# Monday    97975
# Tuesday   77725
# Wednesday 78875
# Thursday  81304
# Friday    82987
# Saturday  86159
# Sunday    94975

# Create XDF data
xdfFile <- file.path(myDir, "AirlineDemoSmallXDF")
xdfData <- RxXdfData(file = xdfFile, createCompositeSet = TRUE, fileSystem = hdfsFileSystem)
rxDataStep(textData, xdfData, overwrite = TRUE)

# Remove factor conversion from textData
textData <- RxTextData(file = csvFile, missingValueString = "M", fileSystem = hdfsFileSystem)

# Create Parquet data
parquetFile <- file.path(myDir, "AirlineDemoSmallParquet")
parquetData <- RxParquetData(file = parquetFile, fileSystem = hdfsFileSystem)
rxDataStep(textData, parquetData, overwrite = TRUE)

# Create Parquet data
orcFile <- file.path(myDir, "AirlineDemoSmallOrc")
orcData <- RxOrcData(file = orcFile, fileSystem = hdfsFileSystem)
rxDataStep(textData, orcData, overwrite = TRUE)

# Create Hive data
hiveData <- RxHiveData(table = "AirlineDemoSmall")
rxDataStep(textData, hiveData, overwrite = TRUE)

rxSparkDisconnect(cc)

Group By Keys

The first and most important feature of rxExecBy is partitioning. Given a data source, rxExecBy can partition the data by single key, as well as multi keys. To do it, simply put the var names into a vector and pass to rxExecBy argument “keys”.

cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
textData <- RxTextData(file = "/share/AirlineDemoSmall/AirlineDemoSmall.csv", missingValueString = "M", stringsAsFactors = TRUE, fileSystem = hdfsFileSystem)
.Summary <- function(keys, data)
{
    df <- rxImport(data)
    nrow(df)
}

# single key
result <- rxExecBy(inData = textData, keys = c("DayOfWeek"), func = .Summary)

str(result)
#List of 7
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 4
#  ..$ result: int 78875
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 6
#  ..$ result: int 82987
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 5
#  ..$ result: int 81304
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 7
#  ..$ result: int 86159
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 1
#  ..$ result: int 97975
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 3
#  ..$ result: int 77725
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 2
#  ..$ result: int 94975
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL

# multi keys
result <- rxExecBy(inData = textData, keys = c("DayOfWeek", "ArrDelay"), func = .Summary)

length(result)
#[1] 3233

str(result[[1]])
#List of 3
# $ keys  :List of 2
#  ..$ : Factor w/ 7 levels "Monday","Sunday",..: 4
#  ..$ : int 388
# $ result: int 2
# $ status:List of 3
#  ..$ : chr "OK"
#  ..$ : NULL
#  ..$ : NULL

rxSparkDisconnect(cc)

Return Value

rxExecBy will return a list of results from partitions, the length of list equals to number of partitions. For each result of partition, it’s a list of “keys”, “result” and “status”,

  • “keys” indicates the value of keys for that partition.
  • “result” is the return value of UDF if run success.
  • “status” shows the run status of UDF, “OK” or “Error”, as well as other runtime warning/error messages.

Take the single key c(“DayOfWeek”) as example. The key has 7 values, so the returned value is a list of 7 objects, with each object is list of 3,

  •  “keys” is list of 1, and the object is a factor with values from 1 to 7, mapping to “Monday”, “Sunday”, ….
  • “result” is the number rows of partition return by nrow in UDF.

In the multi keys c(“DayOfWeek”,  “ArrDelay”) example, the length of returned value shows a total of 3233 partitions. From the result of 1st partition, we can see the “keys” is a list of 2,

  • 1st object is factor 4, which is the value of “DayOfWeek”
  • 2nd object is int 388, which is the value of “ArrDelay”

Another very handy feature in partition result is “status”, which is a list of 3,

  • 1st object is character, indicates if UDF runs success.
  • 2nd object is character of the error message if UDF runs into failure.
  • 3rd object is character vector, contains all the warnings while running UDF.
cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
xdfData <- RxXdfData(file = "/share/AirlineDemoSmall/AirlineDemoSmallXDF", fileSystem = hdfsFileSystem)
.Message <- function(keys, data)
{
    df <- rxImport(data)
    rows  80000)
    {
        warning("rows > 80000")
    }
    if (rows > 85000)
    {
        warning("rows > 85000")
    }
    if (rows > 90000)
    {
        warning("rows > 90000")
    }
    if (rows > 95000)
    {
        stop("rows > 95000")
    }
    rows
}

result  80000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 4
#  ..$ result: int 81304
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr "rows > 80000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 6
#  ..$ result: int 86159
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr [1:2] "rows > 80000" "rows > 85000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 1
#  ..$ result: NULL
#  ..$ status:List of 3
#  .. ..$ : chr "Error"
#  .. ..$ : chr "rows > 95000"
#  .. ..$ : chr [1:3] "rows > 80000" "rows > 85000" "rows > 90000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 2
#  ..$ result: int 77725
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 7
#  ..$ result: int 94975
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr [1:3] "rows > 80000" "rows > 85000" "rows > 90000"

rxSparkDisconnect(cc)

This example shows how the “status” collects warnign and error message.

  • 1st partition result shows 78875 rows, no error and warnings.
  • 2nd partition result shows 82987 rows, no error, 1 warning message.
  • 4th partition result shows 86159 rows, no error, 2 warning messages.
  • 5th partition result shows NULL, as it runs into stop.
    • 1st object of “status” shows “Error”
    • 2nd object shows the error message
    • 3rd object shows all the 3 warning messages
  • 7th partition result shows 94975 rows, no error, 3 warning messages.

RxXdfData as Input Data Source of UDF

UDF is the R function provided by user to apply on each partition, it takes “keys” and “data” as two required input parameters, where “keys” determines the partitioning values and “data” is a data source object of the corresponding partition.

In RxSpark compute context, “data” is a RxXdfData data source,

  • This makes it easy to run any rx-functions directly with good performance and not memory bound, because of the nature of RxXdfData, binary format and stored by chunk.
  • User also has the flexibility to convert it to R data.frame by an additional rxImport/rxDataStep call.

This example shows the ease of use for rx functions by consuming RxXdfData directly.

cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
colInfo <- list(
    DayOfWeek = list(
        type = "factor",
        levels = c(
            "Monday",
            "Tuesday",
            "Wednesday",
            "Thursday",
            "Friday",
            "Saturday",
            "Sunday")))
orcData <- RxOrcData(file = "/share/AirlineDemoSmall/AirlineDemoSmallOrc", fileSystem = hdfsFileSystem, colInfo = colInfo)
.LinMod <- function(keys, data)
{
    rxLinMod(ArrDelay ~ CRSDepTime, data)
}

result <- rxExecBy(inData = orcData, keys = c("DayOfWeek"), func = .LinMod)

result[[1]]
#$keys
#$keys[[1]]
#[1] Wednesday
#Levels: Monday Tuesday Wednesday Thursday Friday Saturday Sunday
#
#
#$result
#Call:
#rxLinMod(formula = ArrDelay ~ CRSDepTime, data = data)
#
##Linear Regression Results for: ArrDelay ~ CRSDepTime
#Data: data (RxXdfData Data Source)
#File name: /dev/shm/MRS-sshuser/3108799476135290757/PXDF0
#Dependent variable(s): ArrDelay
#Total independent variables: 2
#Number of valid observations: 76786
#Number of missing observations: 2089
#
#Coefficients:
#              ArrDelay
#(Intercept) -3.1974586
#CRSDepTime   0.9900705
#
#$status
#$status[[1]]
#[1] "OK"
#
#$status[[2]]
#NULL
#
#$status[[3]]
#NULL

rxSparkDisconnect(cc)

Unroll funcParams to UDF

Except two required parameters, UDF can also take additional parameters which allow user to pass arbitrary values into UDF. This is done by funcParams, here is an example.

cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
colInfo <- list(
    DayOfWeek = list(
        type = "factor",
        levels = c(
            "Monday",
            "Tuesday",
            "Wednesday",
            "Thursday",
            "Friday",
            "Saturday",
            "Sunday")))
parquetData <- RxParquetData(file = "/share/AirlineDemoSmall/AirlineDemoSmallParquet", fileSystem = hdfsFileSystem, colInfo = colInfo)
.Param <- function(keys, data, arg, anotherArg)
{
    df <- rxImport(data)
    rows  arg)
    {
        warning(paste("rows >", arg))
    }
    paste(anotherArg[[keys[[1]]]], "has", rows, "rows")
}

params <- list(arg = 90000, anotherArg = list("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"))
result  90000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 2
#  ..$ result: chr "Tuesday has 77725 rows"
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 7
#  ..$ result: chr "Sunday has 94975 rows"
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr "rows > 90000"

rxSparkDisconnect(cc)

Factor & ColInfo

Factor and ColInfo are also well supported by rxExecBy, however you need to refernce the R help documents to check what are supported for each data source.

This example shows how to read a hive data source and convert the string column into factor. It also defines the factor levels, so any string value of that column that doesn’t show up in the level list will be considered as missing value.

cc <- rxSparkConnect(reset = TRUE)
colInfo <- list(
    ArrDelay = list(
        type = "numeric"),
    DayOfWeek = list(
        type = "factor",
        levels = c(
            "Monday",
            "Sunday",
            "Wednesday",
            "Thursday",
            "Friday",
            "Tuesday")))
hiveData <- RxHiveData(table = "AirlineDemoSmall", colInfo = colInfo)
.Factor <- function(keys, data)
{
    rxSummary(~., data)
}

result <- rxExecBy(inData = hiveData, keys = c("DayOfWeek"), func = .Factor)

# Partitoin result of "Saturday", which is not in the factor level list
result[[4]]
#$keys
#$keys[[1]]
#[1] 
#Levels: Monday Sunday Wednesday Thursday Friday Tuesday
#
#
#$result
#Call:
#rxSummary(formula = ~., data = data)
#
#Summary Statistics Results for: ~.
#Data: data (RxXdfData Data Source)
#File name: /dev/shm/MRS-sshuser/2729766045047878893/PXDF0
#Number of valid observations: 86159
#
# Name       Mean     StdDev    Min        Max        ValidObs MissingObs
# ArrDelay   11.87533 45.245402 -73.000000 1370.00000 83851    2308
# CRSDepTime 13.15288  4.598508   0.083333   23.98333 86159       0
#
#Category Counts for DayOfWeek
#Number of categories: 6
#Number of valid observations: 0
#Number of missing observations: 86159
#
# DayOfWeek Counts
# Monday    0
# Sunday    0
# Wednesday 0
# Thursday  0
# Friday    0
# Tuesday   0
#
#$status
#$status[[1]]
#[1] "OK"
#
#$status[[2]]
#NULL
#
#$status[[3]]
#NULL

# Partition result of "Friday"
result[[2]]
#$keys
#$keys[[1]]
#[1] Friday
#Levels: Monday Sunday Wednesday Thursday Friday Tuesday
#
#
#$result
#Call:
#rxSummary(formula = ~., data = data)
#
#Summary Statistics Results for: ~.
#Data: data (RxXdfData Data Source)
#File name: /dev/shm/MRS-sshuser/2397107652729461182/PXDF0
#Number of valid observations: 82987
#
# Name       Mean     StdDev    Min        Max        ValidObs MissingObs
# ArrDelay   14.80433 41.792601 -78.000000 1490.00000 80142    2845
# CRSDepTime 13.50271  4.739651   0.083333   23.98333 82987       0
#
#Category Counts for DayOfWeek
#Number of categories: 6
#Number of valid observations: 82987
#Number of missing observations: 0
#
# DayOfWeek Counts
# Monday        0
# Sunday        0
# Wednesday     0
# Thursday      0
# Friday    82987
# Tuesday       0
#
#$status
#$status[[1]]
#[1] "OK"
#
#$status[[2]]
#NULL
#
#$status[[3]]
#NULL

rxSparkDisconnect(cc)

Links

Performance: rxExecBy vs gapply on Spark

Microsoft R Blog

Advertisements

Performance: rxExecBy vs gapply on Spark

rxExecBy is a new API of R server release 9.1, it partitions input data source by keys and applies user defined function on each partitions. gapply is a SparkR API that also provides similiar functionality, it groups the SparkDataFrame using specified columns and applies the R function to each group.

Prepare

Environment

The performance comparison of rxExecBy and gapply uses a 4 worker nodes HDI 3.5 cluster with Spark 2.0. VM size of node is standard D4_v2, which is 8 cores and 28G memory. Spark application configuration are,

  • Driver memory: 4g
  • Executor number: 4
  • Executor cores: 5
  • Executor memory: 16g
  • Executor overhead memory: 4g

The data set I’m using for the benchmark is US airline on-time data from 08 to 12. I clean the data leaving only 30 columns(here is a sample csv file containing 1M rows) and preload it into hive to make life easier for gapply, as gapply can only take SparkDataFrame as input data source.

API

Before benchmarking, we need to understand 2 APIs and define the performance story and metric we want to make and collect.

rxExecBy

rxExecBy(inData, keys, func, funcParams = NULL, filterFunc = NULL, computeContext = rxGetOption("computeContext"), ...)

The API is very straightforward, so I will only focus on “func”. This user defined function will be applied to each partition after “grouping by” operation, and it needs to take “keys” and “data” as two reuqired input arguments, where “keys” is a list of partitioning values, and “data” is a RxXdfData(in RxSpark) data source of the partition. There’s no restriction on the return value, but returning a big object is not recommended, because all returned values will be taken back to client, and big object will slow down the performance or even OOM.

gapply

gapply(x, cols, func, schema)

“func” accepts user defined function, and the differences comparing to rxExecBy are,

  • In data source is R data.frame
  • Return value has to be R data.frame

What to Test

Different from other machine learning algorithms, rxExecBy and gapply only do data partition and apply UDF to partition data. So the performance test is mainly focus on how to partition the data, what’s the UDF and input data size.

Group By Keys with Different Values

  • DayOfWeek (7 keys)
  • Dest (About 300 keys)
  • Origin & Dest (About 100K keys)

UDF

The input data of UDF is RxXdfData for rxExecBy, and R data.frame for gapply. We need to define some UDFs that can take both RxXdfData and R data.frame as input, and some other UDFs that only can take R data.frame as input data. In the second case, an additional rxDataStep call is required for UDF of rxExecBy to convert RxXdfData into R data.frame. Below is the details of predefined UDFs,

  • rxSummary, takes RxXdfData and R data.frame as input, return data set summary info.
  • rxLogit, takes RxXdfData and R data.frame as input, return a logit regression model. This is a multi iterations algorithm comparing to single iteration of rxSummary.
  • mean, takes R data.frame as input, return mean values for each columns of input partition data.

Data Set Size

US airline on-time data 08-12 has over 100M rows of data. I cut it on 1M, 2M, 5M, 10M rows and get different sizes.

Metric

We defined the performance metric to be number of rows that rxExecBy/gapply can process per second, the higher the better. Given M rows data set, assuming the run time is T, then metric is M/T.

Benchmark

A combination of key and UDF defines a test case. For each test case, I plot a chart to compare the performance and scales on data set size. Missing points in the charts indicate a run failure of either time out(6000s) or OOM.

rxExec vs gapply

Key: DayOfWeek (7 keys)

UDF: rxSummary

2-1

 

Key: DayOfWeek (7 keys)

UDF: rxLogit

2-2

 

Key: DayOfWeek (7 keys)

UDF: mean

2-3

 

Key: Dest (About 300 keys)

UDF: rxSummary

2-4

 

Key: Dest (About 300 keys)

UDF: rxLogit

2-5

 

Key: Dest (About 300 keys)

UDF: mean

2-6

 

Key: Origin & Dest (About 100K keys)

UDF: mean

2-7

 

Generally, rxExecBy is doing better than gapply, especially on big data set. However, I do see some areas that rxExecBy is not doing well and can have further improvements,

  • On small data set, sometimes it doesn’t fully use cluster cores.
    E.g. by using DayOfWeek as key, running on 1M row data set will trigger the coalesce logic to partition all 7 tasks into one execution slot and run in sequence, leaving all other execution slot idle.
  • It combines all local return values of UDF together and uses R API “save” to store the combined result. However, “save” has very bad performance when R object is very big.

 

rxExecBy Performance

UDF: mean

2-8

The legend in the chart is data set size in million rows. It shows,

  • The performance is increasing with data set size increasing. 100M rows is not the limitation.
  • With fixed data set size, performance will first increase then decrease with key size increasing.

 

Script

Hive data prepare script, rxExecByPerfPrepare.R

Performance benchmark script, rxExecByPerfRun.R

 

Links

rxExecBy Insights on RxSpark Compute Context

Microsoft R Blog

Microsoft R Server Memory Tuning Guideline for RxSpark

Background

RevoScaleR package of Microsoft R Server provides functionalities for both High Performance Analytics (HPA) and High Performance Computing (HPC). HPA provides ‘big data’ capabilities by running distributed analytics across cores and nodes. HPC capabilities allows distributed execution of any R function, and deliver the results back. Most of functionalities of RevoScaleR package are HPA, except rxExec which is HPC.

 

Some Basic Spark Knowledge

In Spark, the process unit is executor and executor will consume two resources, cores and memory, to execute jobs. Cores control the task parallelization, while memory is used for job execution and caching. In this blog, I will focus on YARN, but ideas can be applied to Mesos accordingly.

Executor

Spark acquires executors from resource manager (YARN), which are processes that run computations and store data.

Core

Let’s say one executor is configured to use C cores(spark.executor.cores), and each task is configured to use T cores(spark.task.cpus), then the amount of execution running in parallel is C / T. In most of the case, we don’t change value T, which default is 1.

Memory

Executor is basically a JVM process, the on-heap memory is controlled by spark.executor.memory. Executor will also use off heap-memory, and the size is determined by spark.yarn.executor.memoryOverhead. The sum of both on-heap and off-heap is total memory that one executor will request from resource manager YARN.

Configure Spark in RxSpark

There’re 4 parameters in RxSpark function that controls the Spark executors, cores and memory,

  1. numExecutors
    Max number of executors Spark will request. The real executor number is determined by how many available resources YARN has.
  2. executorCores
    Number of cores to use per each executor process.
  3.  executorMem
    Amount of on-heap memory to be allocated per executor process.
  4. executorOverheadMem
    Amount of max additional(off-heap) memory that can be used per executor.

 

Remember, YARN itself also has CPU and memory configuration.

  1. yarn.nodemanager.resource.memory-mb
    Amount of physical memory, in MB, that can be allocated for containers.
  2. yarn.scheduler.maximum-allocation-mb
    The maximum allocation for every container request at the RM, in MBs. Memory requests higher than this won’t take effect, and will get capped to this value.
  3. yarn.nodemanager.resource.cpu-vcores
    Number of vcores that can be allocated for containers. This is used by the RM scheduler when allocating resources for containers. This is not used to limit the number of physical cores used by YARN containers.
  4. yarn.scheduler.maximum-allocation-vcores
    The maximum allocation for every container request at the RM, in terms of virtual CPU cores. Requests higher than this won’t take effect, and will get capped to this value.

 

To properly create RxSpark compute context, we need to make sure,

  1. Memory constraints
    [yarn.nodemanager.resource.memory-mb] ≥ [yarn.scheduler.maximum-allocation-mb] ≥ [executorMem + executorOverheadMem]
  2. CPU constraints
    [yarn.nodemanager.resource.cpu-vcores] ≥ [yarn.scheduler.maximum-allocation-vcores] ≥ [executorCores]

 

General Spark Configuration Guideline

There’re many factors to consider when configuring memory settings for RxSpark compute context, e.g. the data set, RevoScaleR algorithms, available YARN resources.

Usually, use more cores can achieve better task parallelization and performance. Assuming a Spark app has E executors, C cores per executor and T cores usage per task, then number of parallel tasks is E × C / T.

Executor is created within node, not across multi nodes, so understadning available resources on one node is very important. It’s not always true that the more cores Spark uses the better performance it can gain, because task will also consume memory at execution, not enough memory will slow down performance and even result in OOM failure.

Executor on-heap memory is used for many purpose, e.g. shuffling data, unrolling data, task execution, storage caching. So increase on-heap memory will help on both performance and OOM issue.

The off-heap memory is used by R process and RevoScaleR analytics engine process. Increase off-heap memory may not help on performance, because R and RevoScaleR analytics engine consume memory on demand. However executor and its process tree will be killed if the memory consumed by R and RevoScaleR analytics engine exceeds assigned off-heap memory. So increase off-heap memory will help on mitigating off-heap OOM issue.

More cores with few memory will fail the job, few cores with more memory will waste resources, a balance is very important. The default memory settings of RxSpark  are,

spark.executorCores = 2
spark.executorMem = “4g”
spark.executorOverheadMem = “4g”

We can see generally for each task running in executor, a 2g on-heap memory and 2g off-heap memory is recommended, but this is not absolute.

 

HPA Guideline for RxSpark

HPA functionalities (rxLinMod, rxLogit, rxDTree, …) of RevoScaleR provide “big data” analytic capabilities. It runs distribute job across executor cores on cluster nodes as tasks, uses on-heap memory for data shuffling and caching, and off-heap memory for execution as the real data analytic is running in RevoScaleR analytics engine process.

To tune Spark memory for HPA with fixed cores and memory usage, we can consider,

  1. Use small executor and increase number of executors,
    This is to reduce cores and memory usage per executor but increase executor numbers.Pros,
    a) With fewer cores, parallel tasks in executor will also reduce. So this better task isolation will help reduce the memory usage impact from other tasks within same executor.
    b) Data caching within one executor is small, so rebuilding cost is low, if executor get lost.
    c) Smaller executor can help maximum the YARN resources usage, if available resources on nodes are different.
  2. Cons,
    a) High cost on data shuffling, becuase tasks are in different executors where data can not be shared.
    b) Small on-heap memory setting may result in OOM error when the input data partition is big.
  3. Use big executor and decrease number of executors,
    This is to increase cores and memory usage per executor, but reduce  executor numbers.Pros,
    a) Low cost on data shuffling, as the more tasks in one executor, the more data can be shared.
    b) With more on-heap memory, OOM error rate will decrease.Cons,
    a) Data rebuilding cost is high, if executor get lost.
    b) May not fully use YARN resources on each node.

 

HPC Guideline for RxSpark

HPC(rxExec) function in ScaleR package is quite different from HPA. It uses Spark just for task scheduling, there’s no data unrolling, caching, so doesn’t need too much on-heap memory.

The real computation of user defined function running under R process uses off-heap memory, and the number of parallel tasks is controlled by total cores and number tasks per core(E × C / T). So more cores and more off-heap memory should achieve a better performance.

 

Links

Microsoft R Blog