rxExecBy Insights on RxSpark Compute Context

rxExecBy is designed to resolve a problem that user has a very big data set, want to partition it into many small partitions, and train models on each partition. This is what we call small data many models. rxExecBy has many features and can run in many different compute contexts, e.g. RxSpark, RxInSqlServer, local. In this blog, I’m going to cover RxSpark compute context and help you understand the details.

Data Preperation

In RxSpark, rxExecBy supports RxTextData, RxXdfData(Composite Set), RxHiveData, RxParquetData and RxOrcData. I’m going to use AirlineDemoSmall.csv from RevoScaleR package, and convert it into all these supported data sources.

# Create myDir in hdfs and copy csv file
source <-system.file("SampleData/AirlineDemoSmall.csv", package="RevoScaleR")
myDir <- "/share/AirlineDemoSmall"
rxHadoopMakeDir(myDir)
rxHadoopCopyFromLocal(source, myDir)

cc <- rxSparkConnect(reset = TRUE)

# Summary of data
hdfsFileSystem <- RxHdfsFileSystem()
csvFile <- file.path(myDir, "AirlineDemoSmall.csv")
textData <- RxTextData(file = csvFile, missingValueString = "M", stringsAsFactors = TRUE, fileSystem = hdfsFileSystem)

rxSummary(~., textData)
#Call:
#rxSummary(formula = ~., data = textData)
#Summary Statistics Results for: ~.
#Data: textData (RxTextData Data Source)
#File name: /share/AirlineDemoSmall/AirlineDemoSmall.csv
#Number of valid observations: 6e+05
#
# Name       Mean     StdDev    Min        Max        ValidObs MissingObs
# ArrDelay   11.31794 40.688536 -86.000000 1490.00000 582628   17372
# CRSDepTime 13.48227  4.697566   0.016667   23.98333 600000       0
#
#Category Counts for DayOfWeek
#Number of categories: 7
#Number of valid observations: 6e+05
#Number of missing observations: 0
#
# DayOfWeek Counts
# Monday    97975
# Tuesday   77725
# Wednesday 78875
# Thursday  81304
# Friday    82987
# Saturday  86159
# Sunday    94975

# Create XDF data
xdfFile <- file.path(myDir, "AirlineDemoSmallXDF")
xdfData <- RxXdfData(file = xdfFile, createCompositeSet = TRUE, fileSystem = hdfsFileSystem)
rxDataStep(textData, xdfData, overwrite = TRUE)

# Remove factor conversion from textData
textData <- RxTextData(file = csvFile, missingValueString = "M", fileSystem = hdfsFileSystem)

# Create Parquet data
parquetFile <- file.path(myDir, "AirlineDemoSmallParquet")
parquetData <- RxParquetData(file = parquetFile, fileSystem = hdfsFileSystem)
rxDataStep(textData, parquetData, overwrite = TRUE)

# Create Parquet data
orcFile <- file.path(myDir, "AirlineDemoSmallOrc")
orcData <- RxOrcData(file = orcFile, fileSystem = hdfsFileSystem)
rxDataStep(textData, orcData, overwrite = TRUE)

# Create Hive data
hiveData <- RxHiveData(table = "AirlineDemoSmall")
rxDataStep(textData, hiveData, overwrite = TRUE)

rxSparkDisconnect(cc)

Group By Keys

The first and most important feature of rxExecBy is partitioning. Given a data source, rxExecBy can partition the data by single key, as well as multi keys. To do it, simply put the var names into a vector and pass to rxExecBy argument “keys”.

cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
textData <- RxTextData(file = "/share/AirlineDemoSmall/AirlineDemoSmall.csv", missingValueString = "M", stringsAsFactors = TRUE, fileSystem = hdfsFileSystem)
.Summary <- function(keys, data)
{
    df <- rxImport(data)
    nrow(df)
}

# single key
result <- rxExecBy(inData = textData, keys = c("DayOfWeek"), func = .Summary)

str(result)
#List of 7
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 4
#  ..$ result: int 78875
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 6
#  ..$ result: int 82987
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 5
#  ..$ result: int 81304
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 7
#  ..$ result: int 86159
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 1
#  ..$ result: int 97975
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 3
#  ..$ result: int 77725
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Sunday",..: 2
#  ..$ result: int 94975
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL

# multi keys
result <- rxExecBy(inData = textData, keys = c("DayOfWeek", "ArrDelay"), func = .Summary)

length(result)
#[1] 3233

str(result[[1]])
#List of 3
# $ keys  :List of 2
#  ..$ : Factor w/ 7 levels "Monday","Sunday",..: 4
#  ..$ : int 388
# $ result: int 2
# $ status:List of 3
#  ..$ : chr "OK"
#  ..$ : NULL
#  ..$ : NULL

rxSparkDisconnect(cc)

Return Value

rxExecBy will return a list of results from partitions, the length of list equals to number of partitions. For each result of partition, it’s a list of “keys”, “result” and “status”,

  • “keys” indicates the value of keys for that partition.
  • “result” is the return value of UDF if run success.
  • “status” shows the run status of UDF, “OK” or “Error”, as well as other runtime warning/error messages.

Take the single key c(“DayOfWeek”) as example. The key has 7 values, so the returned value is a list of 7 objects, with each object is list of 3,

  •  “keys” is list of 1, and the object is a factor with values from 1 to 7, mapping to “Monday”, “Sunday”, ….
  • “result” is the number rows of partition return by nrow in UDF.

In the multi keys c(“DayOfWeek”,  “ArrDelay”) example, the length of returned value shows a total of 3233 partitions. From the result of 1st partition, we can see the “keys” is a list of 2,

  • 1st object is factor 4, which is the value of “DayOfWeek”
  • 2nd object is int 388, which is the value of “ArrDelay”

Another very handy feature in partition result is “status”, which is a list of 3,

  • 1st object is character, indicates if UDF runs success.
  • 2nd object is character of the error message if UDF runs into failure.
  • 3rd object is character vector, contains all the warnings while running UDF.
cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
xdfData <- RxXdfData(file = "/share/AirlineDemoSmall/AirlineDemoSmallXDF", fileSystem = hdfsFileSystem)
.Message <- function(keys, data)
{
    df <- rxImport(data)
    rows  80000)
    {
        warning("rows > 80000")
    }
    if (rows > 85000)
    {
        warning("rows > 85000")
    }
    if (rows > 90000)
    {
        warning("rows > 90000")
    }
    if (rows > 95000)
    {
        stop("rows > 95000")
    }
    rows
}

result  80000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 4
#  ..$ result: int 81304
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr "rows > 80000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 6
#  ..$ result: int 86159
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr [1:2] "rows > 80000" "rows > 85000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 1
#  ..$ result: NULL
#  ..$ status:List of 3
#  .. ..$ : chr "Error"
#  .. ..$ : chr "rows > 95000"
#  .. ..$ : chr [1:3] "rows > 80000" "rows > 85000" "rows > 90000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 2
#  ..$ result: int 77725
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 7
#  ..$ result: int 94975
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr [1:3] "rows > 80000" "rows > 85000" "rows > 90000"

rxSparkDisconnect(cc)

This example shows how the “status” collects warnign and error message.

  • 1st partition result shows 78875 rows, no error and warnings.
  • 2nd partition result shows 82987 rows, no error, 1 warning message.
  • 4th partition result shows 86159 rows, no error, 2 warning messages.
  • 5th partition result shows NULL, as it runs into stop.
    • 1st object of “status” shows “Error”
    • 2nd object shows the error message
    • 3rd object shows all the 3 warning messages
  • 7th partition result shows 94975 rows, no error, 3 warning messages.

RxXdfData as Input Data Source of UDF

UDF is the R function provided by user to apply on each partition, it takes “keys” and “data” as two required input parameters, where “keys” determines the partitioning values and “data” is a data source object of the corresponding partition.

In RxSpark compute context, “data” is a RxXdfData data source,

  • This makes it easy to run any rx-functions directly with good performance and not memory bound, because of the nature of RxXdfData, binary format and stored by chunk.
  • User also has the flexibility to convert it to R data.frame by an additional rxImport/rxDataStep call.

This example shows the ease of use for rx functions by consuming RxXdfData directly.

cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
colInfo <- list(
    DayOfWeek = list(
        type = "factor",
        levels = c(
            "Monday",
            "Tuesday",
            "Wednesday",
            "Thursday",
            "Friday",
            "Saturday",
            "Sunday")))
orcData <- RxOrcData(file = "/share/AirlineDemoSmall/AirlineDemoSmallOrc", fileSystem = hdfsFileSystem, colInfo = colInfo)
.LinMod <- function(keys, data)
{
    rxLinMod(ArrDelay ~ CRSDepTime, data)
}

result <- rxExecBy(inData = orcData, keys = c("DayOfWeek"), func = .LinMod)

result[[1]]
#$keys
#$keys[[1]]
#[1] Wednesday
#Levels: Monday Tuesday Wednesday Thursday Friday Saturday Sunday
#
#
#$result
#Call:
#rxLinMod(formula = ArrDelay ~ CRSDepTime, data = data)
#
##Linear Regression Results for: ArrDelay ~ CRSDepTime
#Data: data (RxXdfData Data Source)
#File name: /dev/shm/MRS-sshuser/3108799476135290757/PXDF0
#Dependent variable(s): ArrDelay
#Total independent variables: 2
#Number of valid observations: 76786
#Number of missing observations: 2089
#
#Coefficients:
#              ArrDelay
#(Intercept) -3.1974586
#CRSDepTime   0.9900705
#
#$status
#$status[[1]]
#[1] "OK"
#
#$status[[2]]
#NULL
#
#$status[[3]]
#NULL

rxSparkDisconnect(cc)

Unroll funcParams to UDF

Except two required parameters, UDF can also take additional parameters which allow user to pass arbitrary values into UDF. This is done by funcParams, here is an example.

cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
colInfo <- list(
    DayOfWeek = list(
        type = "factor",
        levels = c(
            "Monday",
            "Tuesday",
            "Wednesday",
            "Thursday",
            "Friday",
            "Saturday",
            "Sunday")))
parquetData <- RxParquetData(file = "/share/AirlineDemoSmall/AirlineDemoSmallParquet", fileSystem = hdfsFileSystem, colInfo = colInfo)
.Param <- function(keys, data, arg, anotherArg)
{
    df <- rxImport(data)
    rows  arg)
    {
        warning(paste("rows >", arg))
    }
    paste(anotherArg[[keys[[1]]]], "has", rows, "rows")
}

params <- list(arg = 90000, anotherArg = list("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"))
result  90000"
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 2
#  ..$ result: chr "Tuesday has 77725 rows"
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : NULL
# $ :List of 3
#  ..$ keys  :List of 1
#  .. ..$ : Factor w/ 7 levels "Monday","Tuesday",..: 7
#  ..$ result: chr "Sunday has 94975 rows"
#  ..$ status:List of 3
#  .. ..$ : chr "OK"
#  .. ..$ : NULL
#  .. ..$ : chr "rows > 90000"

rxSparkDisconnect(cc)

Factor & ColInfo

Factor and ColInfo are also well supported by rxExecBy, however you need to refernce the R help documents to check what are supported for each data source.

This example shows how to read a hive data source and convert the string column into factor. It also defines the factor levels, so any string value of that column that doesn’t show up in the level list will be considered as missing value.

cc <- rxSparkConnect(reset = TRUE)
colInfo <- list(
    ArrDelay = list(
        type = "numeric"),
    DayOfWeek = list(
        type = "factor",
        levels = c(
            "Monday",
            "Sunday",
            "Wednesday",
            "Thursday",
            "Friday",
            "Tuesday")))
hiveData <- RxHiveData(table = "AirlineDemoSmall", colInfo = colInfo)
.Factor <- function(keys, data)
{
    rxSummary(~., data)
}

result <- rxExecBy(inData = hiveData, keys = c("DayOfWeek"), func = .Factor)

# Partitoin result of "Saturday", which is not in the factor level list
result[[4]]
#$keys
#$keys[[1]]
#[1] 
#Levels: Monday Sunday Wednesday Thursday Friday Tuesday
#
#
#$result
#Call:
#rxSummary(formula = ~., data = data)
#
#Summary Statistics Results for: ~.
#Data: data (RxXdfData Data Source)
#File name: /dev/shm/MRS-sshuser/2729766045047878893/PXDF0
#Number of valid observations: 86159
#
# Name       Mean     StdDev    Min        Max        ValidObs MissingObs
# ArrDelay   11.87533 45.245402 -73.000000 1370.00000 83851    2308
# CRSDepTime 13.15288  4.598508   0.083333   23.98333 86159       0
#
#Category Counts for DayOfWeek
#Number of categories: 6
#Number of valid observations: 0
#Number of missing observations: 86159
#
# DayOfWeek Counts
# Monday    0
# Sunday    0
# Wednesday 0
# Thursday  0
# Friday    0
# Tuesday   0
#
#$status
#$status[[1]]
#[1] "OK"
#
#$status[[2]]
#NULL
#
#$status[[3]]
#NULL

# Partition result of "Friday"
result[[2]]
#$keys
#$keys[[1]]
#[1] Friday
#Levels: Monday Sunday Wednesday Thursday Friday Tuesday
#
#
#$result
#Call:
#rxSummary(formula = ~., data = data)
#
#Summary Statistics Results for: ~.
#Data: data (RxXdfData Data Source)
#File name: /dev/shm/MRS-sshuser/2397107652729461182/PXDF0
#Number of valid observations: 82987
#
# Name       Mean     StdDev    Min        Max        ValidObs MissingObs
# ArrDelay   14.80433 41.792601 -78.000000 1490.00000 80142    2845
# CRSDepTime 13.50271  4.739651   0.083333   23.98333 82987       0
#
#Category Counts for DayOfWeek
#Number of categories: 6
#Number of valid observations: 82987
#Number of missing observations: 0
#
# DayOfWeek Counts
# Monday        0
# Sunday        0
# Wednesday     0
# Thursday      0
# Friday    82987
# Tuesday       0
#
#$status
#$status[[1]]
#[1] "OK"
#
#$status[[2]]
#NULL
#
#$status[[3]]
#NULL

rxSparkDisconnect(cc)

Links

Performance: rxExecBy vs gapply on Spark

Microsoft R Blog

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s