Performance: rxExecBy vs gapply on Spark

rxExecBy is a new API of R server release 9.1, it partitions input data source by keys and applies user defined function on each partitions. gapply is a SparkR API that also provides similiar functionality, it groups the SparkDataFrame using specified columns and applies the R function to each group.



The performance comparison of rxExecBy and gapply uses a 4 worker nodes HDI 3.5 cluster with Spark 2.0. VM size of node is standard D4_v2, which is 8 cores and 28G memory. Spark application configuration are,

  • Driver memory: 4g
  • Executor number: 4
  • Executor cores: 5
  • Executor memory: 16g
  • Executor overhead memory: 4g

The data set I’m using for the benchmark is US airline on-time data from 08 to 12. I clean the data leaving only 30 columns(here is a sample csv file containing 1M rows) and preload it into hive to make life easier for gapply, as gapply can only take SparkDataFrame as input data source.


Before benchmarking, we need to understand 2 APIs and define the performance story and metric we want to make and collect.


rxExecBy(inData, keys, func, funcParams = NULL, filterFunc = NULL, computeContext = rxGetOption("computeContext"), ...)

The API is very straightforward, so I will only focus on “func”. This user defined function will be applied to each partition after “grouping by” operation, and it needs to take “keys” and “data” as two reuqired input arguments, where “keys” is a list of partitioning values, and “data” is a RxXdfData(in RxSpark) data source of the partition. There’s no restriction on the return value, but returning a big object is not recommended, because all returned values will be taken back to client, and big object will slow down the performance or even OOM.


gapply(x, cols, func, schema)

“func” accepts user defined function, and the differences comparing to rxExecBy are,

  • In data source is R data.frame
  • Return value has to be R data.frame

What to Test

Different from other machine learning algorithms, rxExecBy and gapply only do data partition and apply UDF to partition data. So the performance test is mainly focus on how to partition the data, what’s the UDF and input data size.

Group By Keys with Different Values

  • DayOfWeek (7 keys)
  • Dest (About 300 keys)
  • Origin & Dest (About 100K keys)


The input data of UDF is RxXdfData for rxExecBy, and R data.frame for gapply. We need to define some UDFs that can take both RxXdfData and R data.frame as input, and some other UDFs that only can take R data.frame as input data. In the second case, an additional rxDataStep call is required for UDF of rxExecBy to convert RxXdfData into R data.frame. Below is the details of predefined UDFs,

  • rxSummary, takes RxXdfData and R data.frame as input, return data set summary info.
  • rxLogit, takes RxXdfData and R data.frame as input, return a logit regression model. This is a multi iterations algorithm comparing to single iteration of rxSummary.
  • mean, takes R data.frame as input, return mean values for each columns of input partition data.

Data Set Size

US airline on-time data 08-12 has over 100M rows of data. I cut it on 1M, 2M, 5M, 10M rows and get different sizes.


We defined the performance metric to be number of rows that rxExecBy/gapply can process per second, the higher the better. Given M rows data set, assuming the run time is T, then metric is M/T.


A combination of key and UDF defines a test case. For each test case, I plot a chart to compare the performance and scales on data set size. Missing points in the charts indicate a run failure of either time out(6000s) or OOM.

rxExec vs gapply

Key: DayOfWeek (7 keys)

UDF: rxSummary



Key: DayOfWeek (7 keys)

UDF: rxLogit



Key: DayOfWeek (7 keys)

UDF: mean



Key: Dest (About 300 keys)

UDF: rxSummary



Key: Dest (About 300 keys)

UDF: rxLogit



Key: Dest (About 300 keys)

UDF: mean



Key: Origin & Dest (About 100K keys)

UDF: mean



Generally, rxExecBy is doing better than gapply, especially on big data set. However, I do see some areas that rxExecBy is not doing well and can have further improvements,

  • On small data set, sometimes it doesn’t fully use cluster cores.
    E.g. by using DayOfWeek as key, running on 1M row data set will trigger the coalesce logic to partition all 7 tasks into one execution slot and run in sequence, leaving all other execution slot idle.
  • It combines all local return values of UDF together and uses R API “save” to store the combined result. However, “save” has very bad performance when R object is very big.


rxExecBy Performance

UDF: mean


The legend in the chart is data set size in million rows. It shows,

  • The performance is increasing with data set size increasing. 100M rows is not the limitation.
  • With fixed data set size, performance will first increase then decrease with key size increasing.



Hive data prepare script, rxExecByPerfPrepare.R

Performance benchmark script, rxExecByPerfRun.R



rxExecBy Insights on RxSpark Compute Context

Microsoft R Blog


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s