Running code in parallel
Like the majority of programming languages, R runs on one CPU core by default. All modern CPUs have multiple cores. This means that there is typically spare computing power that goes unused when we run R. Using our other CPUs is often a very cheap way of making our code run faster. However, it is not the be all end all of getting better performance.
In most cases, the number of CPUs on a system is rather limited. The maximum theoretical speedup from running in parallel is equal to the number of cores you have available. Furthermore, not all code can be parallelized, and scaling is not always linear. If only 20 percent of your code can be run in parallel, the maximum speedup from parallelization (even with an infinite number of cores), would be just 20 percent. The performance gains in the last section were considerably more than that. Because of this, you should only pursue parallelization if you’ve already done all you can to optimize your code. Better code beats more hardware every time (it’s also significantly less expensive).
One final consideration is that we can only parallelize code where each subset of a problem is completely independent from other subsets. If each result depends on the last, only one core can do any work - the others will just sit around waiting on the next result.
With all of that said, let’s make our R code run in parallel.
Parallelization using plyr
and doParallel
So far, I’ve carefully avoided any mention of the plyr
package. plyr
is the predecessor to both dplyr
and purrr
(and is also written by Hadley Wickham) and is no longer actively developed. In many cases, plyr
functions are slower than their tidyverse
equivalents. However, one key advantage of plyr
is that it’s dead-easy to parallelize and uses a much faster default parallel backend for small problem sets. It also provides a nice example of shared memory parallelization vs. the distributed memory parallelization that we will encounter next.
Let’s explore how to write parallel code using plyr
. The first step is to load the plyr
and doParallel
packages, determine the number of cores we will use.
Threads vs. cores
There is often a lot of confusion between CPU threads and cores. A CPU core is the actual computation unit. Threads are a way of multi-tasking, and allow multiple simultaneous tasks to share the same CPU core. Multiple threads do not substitute for multiple cores. Because of this, compute-intensive workloads (like R) are typically only focused on the number of CPU cores available, not threads.
The doParallel
package provides a handy way of looking up the number of cores if we don’t have prior knowledge of the values.
library(plyr)
library(doParallel)
## Loading required package: foreach
## Loading required package: iterators
## Loading required package: parallel
cores <- detectCores()
cores
## [1] 8
Once we have the number of cores, we can regster doParallel
as our parallel backend.
registerDoParallel(cores=cores)
This creates what’s known as a “fork cluster”. A fork cluster is a special type of cluster where a UNIX OS “forks”, or splits the parent process to run on mulitple cores. The forked processes share the same memory and are more or less identical to the parent. Because the processes share the same memory, there is no need to “set them up” by loading packages or transferring variables.
Anyhow all we need to do to parallelize our code now is call the corresponding plyr
function. In this case, we are using llply()
, which is more or less a direct copy of purrr
’s map()
. The syntax is identical. To run in parallel, the only special change we need to do is add .parallel=TRUE
as an argument to llply()
We’ll use a fake function that does nothing but return its argument after sleeping for a bit.
fake_func <- function(x) {
Sys.sleep(0.1)
return(x)
}
library(microbenchmark)
microbenchmark(
serial = llply(1:24, fake_func),
parallel = llply(1:24, fake_func, .parallel = TRUE),
times = 1
)
## Unit: milliseconds
## expr min lq mean median uq max
## serial 2442.9770 2442.9770 2442.9770 2442.9770 2442.9770 2442.9770
## parallel 473.8413 473.8413 473.8413 473.8413 473.8413 473.8413
## neval
## 1
## 1
That’s it. The recipe for parallel code using plyr
is short and sweet (just 4 lines!!!!!). It can’t get any easier than this.
library(plyr)
library(doParallel)
registerDoParallel(cores=detectCores())
result <- llply(object_to_iterate_over, some_func, .parallel=TRUE)
This method of parallelization is perfect for when you just want to do something in parallel “quick and dirty”. It requires zero effort, but keep in mind several things:
There is a small amount of overhead involved in shuffling off data to different cores, Though this will be negligible if each iteration you are parallelizing is relatively large/slow, large numbers of extremely fast operations will be very inefficient.
Savvy readers might have noticed the keyword “UNIX” earlier - only Mac, Linux, and other UNIX variants have the ability to fork processes. This method of parallelization simply cannot be done on Windows.
You cannot spread this type of workload over multiple computers.
Parallelization using multidplyr
multidplyr
is the tidyverse parallel backend. Unlike the plyr
/doParallel
method we just covered, multidplyr
creates a PSOCK cluster by default (“PSOCK” stands for parallel socket cluster). Essentially, this workflow has 5 steps:
Launch our cluster R worker processes (each uses 1 core).
Load packages and send data to the workers.
Our workers execute our workflow in parallel.
Collect results from the workers.
Shut down the cluster (otherwise the workers hang around and continue to eat up resources).
multidplyr
abstracts away several of these steps for us, simplifying our workflow. Let’s explore this using an example calculation on our favorite nycflights13
dataset.
Note that multidplyr
is not available through CRAN, we’ll have to fetch it from Github with the devtools
package. Windows users may need to install RTools beforehand to allow installation from source code.
install.packages("devtools")
devtools::install_github("hadley/multidplyr")
library(tidyverse)
## Loading tidyverse: ggplot2
## Loading tidyverse: tibble
## Loading tidyverse: tidyr
## Loading tidyverse: readr
## Loading tidyverse: purrr
## Loading tidyverse: dplyr
## Conflicts with tidy packages ----------------------------------------------
## accumulate(): purrr, foreach
## arrange(): dplyr, plyr
## compact(): purrr, plyr
## count(): dplyr, plyr
## failwith(): dplyr, plyr
## filter(): dplyr, stats
## id(): dplyr, plyr
## lag(): dplyr, stats
## mutate(): dplyr, plyr
## rename(): dplyr, plyr
## summarise(): dplyr, plyr
## summarize(): dplyr, plyr
## when(): purrr, foreach
library(multidplyr)
library(nycflights13)
results <- flights %>%
partition(dest) %>%
summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>%
collect() %>%
arrange(est_travel_time)
## Initialising 7 core cluster.
## Warning: group_indices_.grouped_df ignores extra arguments
Examining the workflow, we first partition our data across our workers (in this case R decided that we only needed 7 for whatever reason). The partition()
function creates a party_df
, a dataframe that has been partitioned into 7 shards partitioned across our 7 worker processes. partition()
serves more or less the same function as group_by()
, and ensures that all observations for a particular group are assigned to the same worker. collect()
then collects the data from the parallel workers, after which they shut down. Finally, arrange()
is something that cannot be done in parallel. There’s no point in sorting separate shards of data, since they’ll be out of order again when they are recombined.
flights %>% partition(dest)
## Warning: group_indices_.grouped_df ignores extra arguments
## Source: party_df [336,776 x 19]
## Groups: dest
## Shards: 7 [37,722--63,072 rows]
##
## # S3: party_df
## year month day dep_time sched_dep_time dep_delay arr_time
## <int> <int> <int> <int> <int> <dbl> <int>
## 1 2013 1 1 559 600 -1 854
## 2 2013 1 1 602 610 -8 812
## 3 2013 1 1 624 630 -6 909
## 4 2013 1 1 624 630 -6 840
## 5 2013 1 1 629 630 -1 824
## 6 2013 1 1 643 645 -2 837
## 7 2013 1 1 646 645 1 1023
## 8 2013 1 1 651 655 -4 936
## 9 2013 1 1 743 749 -6 1043
## 10 2013 1 1 752 759 -7 955
## # ... with 336,766 more rows, and 12 more variables: sched_arr_time <int>,
## # arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## # minute <dbl>, time_hour <dttm>
Let’s compare that example with non-parallel execution speed.
microbenchmark(
parallel = {
results <- flights %>%
partition(dest) %>%
summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>%
collect() %>%
arrange(est_travel_time)
},
serial = {
results <- flights %>%
group_by(dest) %>%
summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>%
arrange(est_travel_time)
},
times = 5
)
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Unit: milliseconds
## expr min lq mean median uq max
## parallel 1082.4636 1121.14473 1157.19110 1170.60528 1197.55161 1214.19028
## serial 38.6268 39.69088 41.31497 42.55553 42.59156 43.11009
## neval
## 5
## 5
What happened? Our code was actually slower. Short answer, there’s a lot of overhead associated with setting up our parallel workers, moving the data around, and then shutting them down again. When we parallelized plyr
earlier, we cheated a bit using Sys.sleep()
. Let’s do so again here (just for the purposes of demonstration).
microbenchmark(
parallel = {
results <- flights %>%
partition(dest) %>%
summarize(est_travel_time=(function(x) {
Sys.sleep(0.1)
return(mean(x, na.rm=TRUE))
})(air_time)) %>%
collect() %>%
arrange(est_travel_time)
},
serial = {
results <- flights %>%
group_by(dest) %>%
summarize(est_travel_time=(function(x) {
Sys.sleep(0.1)
return(mean(x, na.rm=TRUE))
})(air_time)) %>%
arrange(est_travel_time)
},
times = 5
)
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Warning: group_indices_.grouped_df ignores extra arguments
## Unit: seconds
## expr min lq mean median uq max
## parallel 2.951026 3.087707 3.232776 3.273678 3.387101 3.464368
## serial 10.591109 10.593191 10.614384 10.596090 10.601251 10.690277
## neval
## 5
## 5
Again, there is a limit to speed up (even while cheating), because the arrange()
, collect()
, and partition()
steps always take the same amount of time to execute.
Also, why didn’t we just define an external function (lets call it cheating()
) instead of the (function(col) {...})
monstrosity?
# what we should have done
cheating <- function(col) {
Sys.sleep(0.1)
return(mean(col, na.rm=TRUE))
}
results <- flights %>%
partition(dest) %>%
summarize(est_travel_time=cheating(air_time)) %>%
collect() %>%
arrange(est_travel_time)
Initialising 7 core cluster.
group_indices_.grouped_df ignores extra arguments
Error in checkForRemoteErrors(lapply(cl, recvResult)) : 7 nodes produced errors;
first error: Evaluation error: could not find function "cheating".
What happened? Why doesn’t our parallel cluster know about the cheating()
function? As it turns out, our parallel workers are pretty much brand new copies of R with nothing loaded except for the data sent to them via partition()
. We’ll need to more or less manage our cluster manually to send them the data they need.
There are several functions we’ll use here:
makePSOCKcluster()
- Creates our cluster. This is from theparallel
package.set_default_cluster()
- Make the created cluster get used bypartition()
by default.cluster_library()
- Load an R package on every worker.cluster_assign_value()
- Exports variables to our cluster workers.stopCluster()
- Explicitly shuts down our cluster.
A sample workflow using all of this might look like the following.
# create a cluster and make it the default
library(parallel)
cluster <- makePSOCKcluster(detectCores())
set_default_cluster(cluster)
# define a function
cheating <- function(col) {
Sys.sleep(0.1)
return(mean(col, na.rm=TRUE))
}
# pass it to workers
cluster_assign_value(cluster, "cheating", cheating)
# if we used a library in our parallel workers, we'd use something like the following here:
#cluster_library(libraryName)
# run our workflow
flights %>%
partition(dest) %>%
summarize(est_travel_time=cheating(air_time)) %>%
collect() %>%
arrange(est_travel_time)
## Warning: group_indices_.grouped_df ignores extra arguments
## # A tibble: 105 x 2
## dest est_travel_time
## <chr> <dbl>
## 1 BDL 25.46602
## 2 ALB 31.78708
## 3 PVD 32.66760
## 4 PHL 33.17132
## 5 MVY 36.31905
## 6 BWI 38.49970
## 7 MHT 38.50858
## 8 BOS 38.95300
## 9 ACK 42.06818
## 10 SYR 43.03984
## # ... with 95 more rows
# shut down our cluster at the end
stopCluster(cluster)
That’s a lot of extra work just to run in parallel, isn’t it? One thing I noticed while reading through the multidplyr vignette is that it says it can support running on “clusters created by the parallel package”. This is good news, as one of those types of clusters is the “fork” cluster we used earlier with plyr
(when parallelization was super easy…).
Let’s try this out:
fork <- makeForkCluster(detectCores())
set_default_cluster(fork)
flights %>%
partition(dest) %>%
summarize(est_travel_time=cheating(air_time)) %>%
collect() %>%
arrange(est_travel_time)
stopCluster(fork)
# A tibble: 105 x 2
dest est_travel_time
<chr> <dbl>
1 BDL 25.46602
2 ALB 31.78708
3 PVD 32.66760
4 PHL 33.17132
5 MVY 36.31905
6 BWI 38.49970
7 MHT 38.50858
8 BOS 38.95300
9 ACK 42.06818
10 SYR 43.03984
# ... with 95 more rows
Great news, it worked. We’ll examine how the speed of each type of cluster compares with each other (no cheating this time!). We’ll leave out the arrange()
step, as that doesn’t add anything to our example aside from showing that certain things can’t be parallelized. Since there are multiple clusters at work, we must explicitly specify which cluster we use when we run partition()
.
cluster_fork <- makeForkCluster(detectCores())
cluster_psock <- makePSOCKcluster(detectCores())
microbenchmark(
dplyr_serial = {
flights %>%
group_by(dest) %>%
summarize(est_travel_time=mean(air_time, na.rm=TRUE))
},
dplyr_psock = {
flights %>%
partition(dest, cluster=cluster_psock) %>%
summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>%
collect()
},
dplyr_fork = {
flights %>%
partition(dest, cluster=cluster_fork) %>%
summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>%
collect()
},
plyr_serial = {
flights %>%
# didn't cover ddply, but it's the plyr equivalent of dplyr
# (.variables = group_by())
ddply(.variables = "dest",
.fun=function(x) mean(x$air_time, na.rm=TRUE))
},
plyr_fork = {
flights %>%
ddply(.variables = "dest",
.fun=function(x) mean(x$air_time, na.rm=TRUE),
.parallel=TRUE)
},
times = 5
)
stopCluster(cluster_psock)
stopCluster(cluster_fork)
Unit: milliseconds
expr min lq mean median uq max neval
dplyr_serial 39.14037 39.6308 60.51139 39.91827 41.94427 141.9232 5
dplyr_psock 1007.31082 1301.6537 1393.16955 1366.20333 1409.68778 1880.9922 5
dplyr_fork 962.37095 985.1657 1005.70492 1013.73967 1022.19742 1045.0509 5
plyr_serial 370.86347 389.5137 397.98565 395.11678 416.63985 417.7944 5
plyr_fork 237.73061 248.6463 320.68526 336.84943 386.48719 393.7127 5
Takeaway message, the dplyr fork cluster is easier to use and slightly faster than it’s psock counterpart. Parallelization using plyr
didn’t see the same parallelization overhead as dplyr
, but was still almost 8 times slower than dplyr
in serial mode.
A good question is why does dplyr
take so much longer than plyr
’s fork cluster? If we look at the source code for partition()
, it always transmits all of the data to the forked workers (even though they start with all of the data already!).
Heres a similar duel between vectorized code, purrr
in serial, plyr
in serial, and plyr
in parallel.
library(stringr)
microbenchmark(
vectorized = str_detect(planes$model, "737"),
purrr_serial = {
planes$model %>%
map(~str_detect(.x, "737"))
},
plyr_serial = {
planes$model %>%
llply(.fun = function(x) str_detect(x, "737"))
},
plyr_fork = {
planes$model %>%
llply(.fun = function(x) str_detect(x, "737"), .parallel=TRUE)
},
times = 10
)
## Unit: microseconds
## expr min lq mean median uq
## vectorized 842.563 942.03 975.9827 999.2185 1037.247
## purrr_serial 70792.919 73406.89 75517.8496 75072.9480 77712.982
## plyr_serial 69881.959 70814.89 72361.8620 72102.9315 74025.266
## plyr_fork 993463.434 1018008.97 1055831.6332 1035352.6420 1062911.312
## max neval
## 1053.202 10
## 82178.797 10
## 75607.017 10
## 1191334.835 10
Again, vectorized code is very, very fast. The takeaway here is that we should avoid complexity wherever possible: stick to vectorized code, and only parallelize if you absolutely have to. One other instersting finding is that plyr::llply()
is just as fast as purrr::map()
. Though purrr
cannot be parallelized at the moment, this means we can still use plyr::llply()
in parallel (and llply()
does not suffer from any related performance hit).
Other parallelization methods
Microsoft R Open
There are a number of alternative R implementations. One of them, Microsoft R Open (formerly Revolution R), is a relatively vanilla alternative R implementation compiled against Intel’s MKL libraries (unlike some implmentations like Renjin, it does not completely rewrite the language). Intel’s MKL is generally faster than its open-source GNU R equivalent, and Microsoft R will perform many types of operations in parallel by default. Microsoft R is free and I’ve never really noticed any issues with it relative to the GNU version. Installing this is a performance “freebie” in many cases, just install it and you’re done.
Apache Spark
If you want to parallelize your R jobs across a cluster, you likely will want to use Spark. (The alternative to using Spark would be to write code using something like Rmpi, but at that point you’re better off just switching langages to C++ or Fortran.) Spark is a distributed compute engine that runs analyses in parallel across multiple nodes. It can be a bit complex to get started with, and is outside the scope of this tutorial. However, if you are looking to get started with Spark, I recommend checking out RStudio’s sparklyr package.
Serial farming across a batch computing cluster
The traditional HPC cluster manages workloads using a batch scheduler like SLURM. Essentially, users submit non-interactive batch job scripts to the scheduler, which decides where a user’s jobs get run. The cluster filesystem is shared across all nodes. Our workflow here would normally take one of two forms: “manually”, where we write chunks of our dataset to disk and then write a separate R job to analyze each chunk, and using an automation tool like Snakemake or the batchjobs package.
Exercise - Writing a command line R program
Typically, when running on the command line, we want our R scripts to accept command line arguments (like which data chunk to analyze). To do this, we use the commandArgs()
function.
args <- commandArgs(TRUE)
Write an R program that takes two numbers off of the command line and adds them together. You can run it with Rscript yourScript.R arg1 arg2
.
For more complex scenarios, I recommend looking into the argparse
package.