Running code in parallel

Like the majority of programming languages, R runs on one CPU core by default. All modern CPUs have multiple cores. This means that there is typically spare computing power that goes unused when we run R. Using our other CPUs is often a very cheap way of making our code run faster. However, it is not the be all end all of getting better performance.

In most cases, the number of CPUs on a system is rather limited. The maximum theoretical speedup from running in parallel is equal to the number of cores you have available. Furthermore, not all code can be parallelized, and scaling is not always linear. If only 20 percent of your code can be run in parallel, the maximum speedup from parallelization (even with an infinite number of cores), would be just 20 percent. The performance gains in the last section were considerably more than that. Because of this, you should only pursue parallelization if you’ve already done all you can to optimize your code. Better code beats more hardware every time (it’s also significantly less expensive).

One final consideration is that we can only parallelize code where each subset of a problem is completely independent from other subsets. If each result depends on the last, only one core can do any work - the others will just sit around waiting on the next result.

With all of that said, let’s make our R code run in parallel.

Parallelization using plyr and doParallel

So far, I’ve carefully avoided any mention of the plyr package. plyr is the predecessor to both dplyr and purrr (and is also written by Hadley Wickham) and is no longer actively developed. In many cases, plyr functions are slower than their tidyverse equivalents. However, one key advantage of plyr is that it’s dead-easy to parallelize and uses a much faster default parallel backend for small problem sets. It also provides a nice example of shared memory parallelization vs. the distributed memory parallelization that we will encounter next.

Let’s explore how to write parallel code using plyr. The first step is to load the plyr and doParallel packages, determine the number of cores we will use.

Threads vs. cores

There is often a lot of confusion between CPU threads and cores. A CPU core is the actual computation unit. Threads are a way of multi-tasking, and allow multiple simultaneous tasks to share the same CPU core. Multiple threads do not substitute for multiple cores. Because of this, compute-intensive workloads (like R) are typically only focused on the number of CPU cores available, not threads.

The doParallel package provides a handy way of looking up the number of cores if we don’t have prior knowledge of the values.

library(plyr)
library(doParallel)
## Loading required package: foreach
## Loading required package: iterators
## Loading required package: parallel
cores <- detectCores()
cores
## [1] 8

Once we have the number of cores, we can regster doParallel as our parallel backend.

registerDoParallel(cores=cores)

This creates what’s known as a “fork cluster”. A fork cluster is a special type of cluster where a UNIX OS “forks”, or splits the parent process to run on mulitple cores. The forked processes share the same memory and are more or less identical to the parent. Because the processes share the same memory, there is no need to “set them up” by loading packages or transferring variables.

Anyhow all we need to do to parallelize our code now is call the corresponding plyr function. In this case, we are using llply(), which is more or less a direct copy of purrr’s map(). The syntax is identical. To run in parallel, the only special change we need to do is add .parallel=TRUE as an argument to llply() We’ll use a fake function that does nothing but return its argument after sleeping for a bit.

fake_func <- function(x) {
  Sys.sleep(0.1)
  return(x)
}

library(microbenchmark)
microbenchmark(
  serial = llply(1:24, fake_func),
  parallel = llply(1:24, fake_func, .parallel = TRUE),
  times = 1
)
## Unit: milliseconds
##      expr       min        lq      mean    median        uq       max
##    serial 2442.9770 2442.9770 2442.9770 2442.9770 2442.9770 2442.9770
##  parallel  473.8413  473.8413  473.8413  473.8413  473.8413  473.8413
##  neval
##      1
##      1

That’s it. The recipe for parallel code using plyr is short and sweet (just 4 lines!!!!!). It can’t get any easier than this.

library(plyr)
library(doParallel)
registerDoParallel(cores=detectCores())

result <- llply(object_to_iterate_over, some_func, .parallel=TRUE)

This method of parallelization is perfect for when you just want to do something in parallel “quick and dirty”. It requires zero effort, but keep in mind several things:

  • There is a small amount of overhead involved in shuffling off data to different cores, Though this will be negligible if each iteration you are parallelizing is relatively large/slow, large numbers of extremely fast operations will be very inefficient.

  • Savvy readers might have noticed the keyword “UNIX” earlier - only Mac, Linux, and other UNIX variants have the ability to fork processes. This method of parallelization simply cannot be done on Windows.

  • You cannot spread this type of workload over multiple computers.

Parallelization using multidplyr

multidplyr is the tidyverse parallel backend. Unlike the plyr/doParallel method we just covered, multidplyr creates a PSOCK cluster by default (“PSOCK” stands for parallel socket cluster). Essentially, this workflow has 5 steps:

  • Launch our cluster R worker processes (each uses 1 core).

  • Load packages and send data to the workers.

  • Our workers execute our workflow in parallel.

  • Collect results from the workers.

  • Shut down the cluster (otherwise the workers hang around and continue to eat up resources).

multidplyr abstracts away several of these steps for us, simplifying our workflow. Let’s explore this using an example calculation on our favorite nycflights13 dataset.

Note that multidplyr is not available through CRAN, we’ll have to fetch it from Github with the devtools package. Windows users may need to install RTools beforehand to allow installation from source code.

install.packages("devtools")
devtools::install_github("hadley/multidplyr")
library(tidyverse)
## Loading tidyverse: ggplot2
## Loading tidyverse: tibble
## Loading tidyverse: tidyr
## Loading tidyverse: readr
## Loading tidyverse: purrr
## Loading tidyverse: dplyr
## Conflicts with tidy packages ----------------------------------------------
## accumulate(): purrr, foreach
## arrange():    dplyr, plyr
## compact():    purrr, plyr
## count():      dplyr, plyr
## failwith():   dplyr, plyr
## filter():     dplyr, stats
## id():         dplyr, plyr
## lag():        dplyr, stats
## mutate():     dplyr, plyr
## rename():     dplyr, plyr
## summarise():  dplyr, plyr
## summarize():  dplyr, plyr
## when():       purrr, foreach
library(multidplyr)
library(nycflights13)

results <- flights %>% 
  partition(dest) %>% 
  summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>% 
  collect() %>% 
  arrange(est_travel_time)
## Initialising 7 core cluster.
## Warning: group_indices_.grouped_df ignores extra arguments

Examining the workflow, we first partition our data across our workers (in this case R decided that we only needed 7 for whatever reason). The partition() function creates a party_df, a dataframe that has been partitioned into 7 shards partitioned across our 7 worker processes. partition() serves more or less the same function as group_by(), and ensures that all observations for a particular group are assigned to the same worker. collect() then collects the data from the parallel workers, after which they shut down. Finally, arrange() is something that cannot be done in parallel. There’s no point in sorting separate shards of data, since they’ll be out of order again when they are recombined.

flights %>% partition(dest)
## Warning: group_indices_.grouped_df ignores extra arguments
## Source: party_df [336,776 x 19]
## Groups: dest
## Shards: 7 [37,722--63,072 rows]
## 
## # S3: party_df
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
##  1  2013     1     1      559            600        -1      854
##  2  2013     1     1      602            610        -8      812
##  3  2013     1     1      624            630        -6      909
##  4  2013     1     1      624            630        -6      840
##  5  2013     1     1      629            630        -1      824
##  6  2013     1     1      643            645        -2      837
##  7  2013     1     1      646            645         1     1023
##  8  2013     1     1      651            655        -4      936
##  9  2013     1     1      743            749        -6     1043
## 10  2013     1     1      752            759        -7      955
## # ... with 336,766 more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dttm>

Let’s compare that example with non-parallel execution speed.

microbenchmark(
  parallel = {
    results <- flights %>% 
      partition(dest) %>% 
      summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>% 
      collect() %>% 
      arrange(est_travel_time)
  },
  serial = {
    results <- flights %>% 
      group_by(dest) %>% 
      summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>% 
      arrange(est_travel_time)
  },
  times = 5
)
## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments
## Unit: milliseconds
##      expr       min         lq       mean     median         uq        max
##  parallel 1082.4636 1121.14473 1157.19110 1170.60528 1197.55161 1214.19028
##    serial   38.6268   39.69088   41.31497   42.55553   42.59156   43.11009
##  neval
##      5
##      5

What happened? Our code was actually slower. Short answer, there’s a lot of overhead associated with setting up our parallel workers, moving the data around, and then shutting them down again. When we parallelized plyr earlier, we cheated a bit using Sys.sleep(). Let’s do so again here (just for the purposes of demonstration).

microbenchmark(
  parallel = {
    results <- flights %>% 
      partition(dest) %>% 
      summarize(est_travel_time=(function(x) {
        Sys.sleep(0.1)
        return(mean(x, na.rm=TRUE))
      })(air_time)) %>% 
      collect() %>% 
      arrange(est_travel_time)
  },
  serial = {
    results <- flights %>% 
      group_by(dest) %>% 
      summarize(est_travel_time=(function(x) {
        Sys.sleep(0.1)
        return(mean(x, na.rm=TRUE))
      })(air_time)) %>% 
      arrange(est_travel_time)
  },
  times = 5
)
## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments

## Warning: group_indices_.grouped_df ignores extra arguments
## Unit: seconds
##      expr       min        lq      mean    median        uq       max
##  parallel  2.951026  3.087707  3.232776  3.273678  3.387101  3.464368
##    serial 10.591109 10.593191 10.614384 10.596090 10.601251 10.690277
##  neval
##      5
##      5

Again, there is a limit to speed up (even while cheating), because the arrange(), collect(), and partition() steps always take the same amount of time to execute.

Also, why didn’t we just define an external function (lets call it cheating()) instead of the (function(col) {...}) monstrosity?

# what we should have done
cheating <- function(col) {
  Sys.sleep(0.1)
  return(mean(col, na.rm=TRUE))
}

results <- flights %>% 
  partition(dest) %>% 
  summarize(est_travel_time=cheating(air_time)) %>% 
  collect() %>% 
  arrange(est_travel_time)
Initialising 7 core cluster.
group_indices_.grouped_df ignores extra arguments
Error in checkForRemoteErrors(lapply(cl, recvResult)) : 7 nodes produced errors; 
first error: Evaluation error: could not find function "cheating".

What happened? Why doesn’t our parallel cluster know about the cheating() function? As it turns out, our parallel workers are pretty much brand new copies of R with nothing loaded except for the data sent to them via partition(). We’ll need to more or less manage our cluster manually to send them the data they need.

There are several functions we’ll use here:

  • makePSOCKcluster() - Creates our cluster. This is from the parallel package.
  • set_default_cluster() - Make the created cluster get used by partition() by default.
  • cluster_library() - Load an R package on every worker.
  • cluster_assign_value() - Exports variables to our cluster workers.
  • stopCluster() - Explicitly shuts down our cluster.

A sample workflow using all of this might look like the following.

# create a cluster and make it the default
library(parallel)
cluster <- makePSOCKcluster(detectCores())
set_default_cluster(cluster)

# define a function
cheating <- function(col) {
  Sys.sleep(0.1)
  return(mean(col, na.rm=TRUE))
}
# pass it to workers
cluster_assign_value(cluster, "cheating", cheating)
# if we used a library in our parallel workers, we'd use something like the following here:
#cluster_library(libraryName)

# run our workflow
flights %>% 
  partition(dest) %>% 
  summarize(est_travel_time=cheating(air_time)) %>% 
  collect() %>% 
  arrange(est_travel_time)
## Warning: group_indices_.grouped_df ignores extra arguments
## # A tibble: 105 x 2
##     dest est_travel_time
##    <chr>           <dbl>
##  1   BDL        25.46602
##  2   ALB        31.78708
##  3   PVD        32.66760
##  4   PHL        33.17132
##  5   MVY        36.31905
##  6   BWI        38.49970
##  7   MHT        38.50858
##  8   BOS        38.95300
##  9   ACK        42.06818
## 10   SYR        43.03984
## # ... with 95 more rows
# shut down our cluster at the end
stopCluster(cluster)

That’s a lot of extra work just to run in parallel, isn’t it? One thing I noticed while reading through the multidplyr vignette is that it says it can support running on “clusters created by the parallel package”. This is good news, as one of those types of clusters is the “fork” cluster we used earlier with plyr (when parallelization was super easy…).

Let’s try this out:

fork <- makeForkCluster(detectCores())
set_default_cluster(fork)

flights %>% 
  partition(dest) %>% 
  summarize(est_travel_time=cheating(air_time)) %>% 
  collect() %>% 
  arrange(est_travel_time)

stopCluster(fork)
# A tibble: 105 x 2
    dest est_travel_time
   <chr>           <dbl>
 1   BDL        25.46602
 2   ALB        31.78708
 3   PVD        32.66760
 4   PHL        33.17132
 5   MVY        36.31905
 6   BWI        38.49970
 7   MHT        38.50858
 8   BOS        38.95300
 9   ACK        42.06818
10   SYR        43.03984
# ... with 95 more rows

Great news, it worked. We’ll examine how the speed of each type of cluster compares with each other (no cheating this time!). We’ll leave out the arrange() step, as that doesn’t add anything to our example aside from showing that certain things can’t be parallelized. Since there are multiple clusters at work, we must explicitly specify which cluster we use when we run partition().

cluster_fork <- makeForkCluster(detectCores())
cluster_psock <- makePSOCKcluster(detectCores())

microbenchmark(
  dplyr_serial = {
    flights %>% 
      group_by(dest) %>% 
      summarize(est_travel_time=mean(air_time, na.rm=TRUE))
  },
  dplyr_psock = {
    flights %>% 
      partition(dest, cluster=cluster_psock) %>% 
      summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>% 
      collect()
  },
  dplyr_fork = {
    flights %>% 
      partition(dest, cluster=cluster_fork) %>% 
      summarize(est_travel_time=mean(air_time, na.rm=TRUE)) %>% 
      collect()
  },
  plyr_serial = {
    flights %>% 
      # didn't cover ddply, but it's the plyr equivalent of dplyr 
      # (.variables = group_by())
      ddply(.variables = "dest",
            .fun=function(x) mean(x$air_time, na.rm=TRUE))
  },
  plyr_fork = {
    flights %>% 
      ddply(.variables = "dest", 
            .fun=function(x) mean(x$air_time, na.rm=TRUE), 
            .parallel=TRUE)
  }, 
  times = 5
)

stopCluster(cluster_psock)
stopCluster(cluster_fork)
Unit: milliseconds
         expr        min        lq       mean     median         uq       max neval
 dplyr_serial   39.14037   39.6308   60.51139   39.91827   41.94427  141.9232     5
  dplyr_psock 1007.31082 1301.6537 1393.16955 1366.20333 1409.68778 1880.9922     5
   dplyr_fork  962.37095  985.1657 1005.70492 1013.73967 1022.19742 1045.0509     5
  plyr_serial  370.86347  389.5137  397.98565  395.11678  416.63985  417.7944     5
    plyr_fork  237.73061  248.6463  320.68526  336.84943  386.48719  393.7127     5

Takeaway message, the dplyr fork cluster is easier to use and slightly faster than it’s psock counterpart. Parallelization using plyr didn’t see the same parallelization overhead as dplyr, but was still almost 8 times slower than dplyr in serial mode.

A good question is why does dplyr take so much longer than plyr’s fork cluster? If we look at the source code for partition(), it always transmits all of the data to the forked workers (even though they start with all of the data already!).

Heres a similar duel between vectorized code, purrr in serial, plyr in serial, and plyr in parallel.

library(stringr)

microbenchmark(
  vectorized = str_detect(planes$model, "737"),
  purrr_serial = {
    planes$model %>% 
      map(~str_detect(.x, "737"))
  },
  plyr_serial = {
    planes$model %>% 
      llply(.fun = function(x) str_detect(x, "737"))
  },
  plyr_fork = {
    planes$model %>% 
      llply(.fun = function(x) str_detect(x, "737"), .parallel=TRUE)
  }, 
  times = 10
)
## Unit: microseconds
##          expr        min         lq         mean       median          uq
##    vectorized    842.563     942.03     975.9827     999.2185    1037.247
##  purrr_serial  70792.919   73406.89   75517.8496   75072.9480   77712.982
##   plyr_serial  69881.959   70814.89   72361.8620   72102.9315   74025.266
##     plyr_fork 993463.434 1018008.97 1055831.6332 1035352.6420 1062911.312
##          max neval
##     1053.202    10
##    82178.797    10
##    75607.017    10
##  1191334.835    10

Again, vectorized code is very, very fast. The takeaway here is that we should avoid complexity wherever possible: stick to vectorized code, and only parallelize if you absolutely have to. One other instersting finding is that plyr::llply() is just as fast as purrr::map(). Though purrr cannot be parallelized at the moment, this means we can still use plyr::llply() in parallel (and llply() does not suffer from any related performance hit).

Other parallelization methods

Microsoft R Open

There are a number of alternative R implementations. One of them, Microsoft R Open (formerly Revolution R), is a relatively vanilla alternative R implementation compiled against Intel’s MKL libraries (unlike some implmentations like Renjin, it does not completely rewrite the language). Intel’s MKL is generally faster than its open-source GNU R equivalent, and Microsoft R will perform many types of operations in parallel by default. Microsoft R is free and I’ve never really noticed any issues with it relative to the GNU version. Installing this is a performance “freebie” in many cases, just install it and you’re done.

Apache Spark

If you want to parallelize your R jobs across a cluster, you likely will want to use Spark. (The alternative to using Spark would be to write code using something like Rmpi, but at that point you’re better off just switching langages to C++ or Fortran.) Spark is a distributed compute engine that runs analyses in parallel across multiple nodes. It can be a bit complex to get started with, and is outside the scope of this tutorial. However, if you are looking to get started with Spark, I recommend checking out RStudio’s sparklyr package.

Serial farming across a batch computing cluster

The traditional HPC cluster manages workloads using a batch scheduler like SLURM. Essentially, users submit non-interactive batch job scripts to the scheduler, which decides where a user’s jobs get run. The cluster filesystem is shared across all nodes. Our workflow here would normally take one of two forms: “manually”, where we write chunks of our dataset to disk and then write a separate R job to analyze each chunk, and using an automation tool like Snakemake or the batchjobs package.

Exercise - Writing a command line R program

Typically, when running on the command line, we want our R scripts to accept command line arguments (like which data chunk to analyze). To do this, we use the commandArgs() function.

args <- commandArgs(TRUE)

Write an R program that takes two numbers off of the command line and adds them together. You can run it with Rscript yourScript.R arg1 arg2.

For more complex scenarios, I recommend looking into the argparse package.