Book Image

Mastering Parallel Programming with R

By : Simon R. Chapple, Terence Sloan, Thorsten Forster, Eilidh Troup
Book Image

Mastering Parallel Programming with R

By: Simon R. Chapple, Terence Sloan, Thorsten Forster, Eilidh Troup

Overview of this book

R is one of the most popular programming languages used in data science. Applying R to big data and complex analytic tasks requires the harnessing of scalable compute resources. Mastering Parallel Programming with R presents a comprehensive and practical treatise on how to build highly scalable and efficient algorithms in R. It will teach you a variety of parallelization techniques, from simple use of R’s built-in parallel package versions of lapply(), to high-level AWS cloud-based Hadoop and Apache Spark frameworks. It will also teach you low level scalable parallel programming using RMPI and pbdMPI for message passing, applicable to clusters and supercomputers, and how to exploit thousand-fold simple processor GPUs through ROpenCL. By the end of the book, you will understand the factors that influence parallel efficiency, including assessing code performance and implementing load balancing; pitfalls to avoid, including deadlock and numerical instability issues; how to structure your code and data for the most appropriate type of parallelism for your problem domain; and how to extract the maximum performance from your R code running on a variety of computer systems.
Table of Contents (13 chapters)

The R parallel package


The R parallel package is now part of the core distribution of R. It includes a number of different mechanisms to enable you to exploit parallelism utilizing the multiple cores in your processor(s) as well as compute the resources distributed across a network as a cluster of machines. However, as our theme in this chapter is one of simplicity, we will stick to making the most of the resources available on the machine on which you are running R.

The first thing you need to do is to enable the parallelism package. You can either just use R's library() function to load it, or if you are using RStudio, you can just tick the corresponding entry in the User Library list in the Packages tab. The second thing we need to do is determine just how much parallelism we can utilize by calling the parallel package function detectCores(), as follows:

> library("parallel")
> detectCores()
[1] 4

As we can immediately note, on my MacBook device, I have four cores available across which I can run R programs in parallel. It's easy to verify this using Mac's Activity Monitor app and selecting the CPU History option from the Window menu. You should see something similar to the following, with one timeline graph per core:

The green elements of the plotted bars indicate the proportion of CPU spent in user code, and the red elements indicate the proportion of time spent in system code. You can vary the frequency of graph update to a maximum of once a second. A similar multicore CPU history is available in Microsoft Windows. It is useful to have this type of view open when running code in parallel as you can immediately see when your code is utilizing multiple cores. You can also see what other activity is taking place on your machine that might impact your R code running in parallel.

Using mclapply()

The simplest mechanism to achieve parallelism in R is to use parallel's multicore variant of lapply() called (logically) mclapply().

Note

The mclapply() function is Unix-only

The mclapply() function is only available when you are running R on Mac OS X or Linux or other variants of Unix. It is implemented with the Unix fork() system call and therefore cannot be used on Microsoft Windows; rest assured, we will come to a Microsoft Windows compatible solution shortly. The Unix fork() system call operates by replicating the currently running process (including its entire memory state, open file descriptors, and other process resources, and importantly, from an R perspective, any currently loaded libraries) as a set of independent child processes that will each continue separate execution until they make the exit() system call, at which point the parent process will collect their exit state. Once all children terminate, the fork will be completed. All of this behavior is wrapped up inside the call to mclapply(). If you view your running processes in Activity Monitor on Mac OS X, you will see mc.cores number of spawned rsession processes with high CPU utilization when mclapply() is called.

Similar to lapply(), the first argument is the list of function inputs corresponding to independent tasks, and the second argument is the function to be executed for each task. An optional argument, mc.cores, allows us to specify how many cores we want to make use of—that is, the degree of parallelism we want to use. If when you ran detectCores() and the result was 1, then mclapply() will resort to just calling lapply() internally—that is, the computation will just run serially.

Let's initially run mclapply() through a small subset of the triple tile board starting positions using the same set we tried previously with lapply() for comparison, as follows:

> tri <- generateTriples()
> tasks <- list(tri[[1]],tri[[21]],tri[[41]],tri[[61]])
> teval(mclapply(tasks,solver,mc.cores=detectCores()))
$Duration               ## Overall
   user  system elapsed 
146.412   0.433  87.621
$Result[[1]]$Duration   ## Triple "011819"
   user  system elapsed 
  2.182   0.010   2.274 
$Result[[2]]$Duration   ## Triple "061517"
   user  system elapsed 
 58.686   0.108  59.391 
$Result[[3]]$Duration   ## Triple "091019"
   user  system elapsed 
 85.353   0.147  86.198 
$Result[[4]]$Duration   ## Triple "111215"
   user  system elapsed 
 86.604   0.152  87.498 

The preceding output is again trimmed and commented for brevity and clarity. What you should immediately notice is that the overall elapsed time for executing all of the tasks is no greater than the length of time it took to compute the longest running of the four tasks. Voila! We have managed to significantly reduce our running time from 178 seconds running in serial to just 87 seconds by making simultaneous use of all the four cores available. However, 87 seconds is only half of 178 seconds, and you may have expected that we would have seen a four-times speedup over running in serial. You may also notice that our individual runtime increased for each individual task compared to running in serial—for example, for tile-triple 111215 from 65 seconds to 87 seconds. Part of this difference is due to the overhead from the forking mechanism and the time it takes to spin up a new child process, apportion it tasks, collect its results, and tear it down. The good news is that this overhead can be amortized by having each parallel process compute a large number of tasks.

Another consideration is that my particular MacBook laptop uses an Intel Core i5 processor, which, in practice, is more the equivalent of 2 x 1.5 cores as it exploits hyperthreading across two full processor cores to increase performance and has certain limitations but is still treated by the operating system as four fully independent cores. If I run the preceding example on two of my laptop cores, then the overall runtime is just 107 seconds. Two times hyperthreading, therefore, gains me an extra 20% on performance, which although good, is still much less than the desired 50% performance improvement.

I'm sure at this point, if you haven't already done so, then you will have the urge to run the solver in parallel across all 90 of the starting tile-triples and find the solution to Aristotle's Number Puzzle, though you might want to take a long coffee break or have lunch while it runs….

Options for mclapply()

The mclapply() function has more capability than we have so far touched upon. The following table summarizes these extended capabilities and briefly discusses when they are most appropriately applied:

mclapply(X, FUN, ..., mc.preschedule=TRUE, mc.set.seed=TRUE,   
         mc.silent=FALSE, mc.cores=getOption("mc.cores",2L), 
         mc.cleanup=TRUE, mc.allow.recursive=TRUE)
returns: list of FUN results, where length(returns)=length(X)

Option [default=value]

Description

X

This is the list (or vector) of items that represent tasks to be computed by the user-defined FUN function.

FUN

This is the user-defined function to execute on each task. FUN will be called multiple times: FUN(x,…), where x is one of the remaining task items in X to be computed on and matches the extra arguments passed into mclapply().

Any extra non-mclapply arguments are passed directly into FUN on each task execution.

mc.preschedule

[default=TRUE]

If this is TRUE, then one child process is forked for each core requested, the tasks are split as evenly as possible between cores in the "round-robin" order, and each child executes its allotted set of tasks. For most parallel workloads, this is normally the best choice.

If this is FALSE, then a new child process is forked afresh for each task executed. This option is useful where tasks are relatively long running but have significant variance in compute time as it enables a level of adaptive load balancing to be employed at the cost of increased overhead of a fork per task, as opposed to a fork per core.

In either case, there will be a maximum of mc.cores child processes running at any given time while mcapply( ) is executed.

mc.set.seed

[default=TRUE]

The behavior of this option is governed by the type of random number generator (RNG) in use for the current R session.

If this is TRUE and an appropriate RNG is selected, then the child process will be launched with a specific RNG sequence selected, such that a subsequent invocation of mclapply() with the same arguments set will produce the same result (assuming the computation makes use of the specific RNG). Otherwise, the behavior is as for FALSE.

If this is FALSE, then the child process inherits the random number state at the start of its execution from the parent R session, and it is likely that it will be difficult to generate reproducible results.

Having consistent random number generation for parallel code is a topic we will cover in the online chapter.

mc.silent

[default=FALSE]

If this is TRUE, then any output generated to the standard output stream will be suppressed (such as the print statement output).

If this is FALSE, then standard output is unaffected. However, also refer to the tip following this table.

In either case, the output to the standard error stream is unaffected.

mc.cores

[default=2 or if defined getOption("mc.cores")]

This option sets the degree of parallelism to use and is arguably misnamed as it actually controls the number of simultaneous processes running that execute tasks, and this can well exceed the number of physical processor cores should you so desire. For some types of parallel workload, such as a small number of long-running but variable compute tasks where intermediate results can be generated (such as to the filesystem or by messaging). This may even be helpful as it enables the operating system time slicing of processes to ensure fair progress on a set of tasks. Of course, the downside is increased overhead of constant switching between running processes.

Constraints on the upper bound for this are dependent on the operating system and machine resource, but in general, it will be in the 100s as opposed to 1000s.

mc.cleanup

[default=TRUE]

If this is TRUE, then the child processes will forcibly be terminated by the parent. If this is FALSE, then child processes may be left running after they complete the mclapply() operation. The latter is potentially useful for post-compute debugging by attaching to the still-running process.

In either case, mclapply() waits until all the children complete their tasks and then returns the combined set of computed results.

mc.allow.recursive

[default=TRUE]

If this is TRUE, then FUN can itself make calls to mclapply() or call code that also invokes mclapply(). On the whole, such recursion is only used in exotic forms of parallel programming.

If this is FALSE, then a recursive attempt to call mclapply() will simply result in an internal call to lapply(), enforcing serial execution within the child process.

Lets have a look at a tip:

Tip

The print() function in parallel

In Rstudio, the output is not directed to the screen when running in parallel with mclapply(). If you wish to generate print messages or other console output, you should run your program directly from the command shell rather than from within RStudio. In general, the authors of mclapply() do not recommend running parallel R code from a GUI console editor because it can cause a number of complications, with multiple processes attempting to interact with the GUI. It is not suitable, for example, to attempt to plot to the GUI's graphics display when running in parallel. With our solver code, though, you should not experience any specific issue. It's also worth noting that having multiple processes writing messages to the same shared output stream can become very confusing as messages can potentially be interleaved and unreadable, depending on how the output stream buffers I/O. We will come back to the topic of parallel I/O in a later chapter.

Using parLapply()

The mclapply() function is closely related to the more generic parallel package function parLapply(). The key difference is that we separately create a cluster of parallel R processes using makeCluster(), and parLapply() then utilizes this cluster when executing a function in parallel. There are two key advantages to this approach. Firstly, with makeCluster(), we can create different underlying implementations of a parallel processing pool, including a forked process cluster (FORK) similar to that used internally within mclapply(), a socket-based cluster (PSOCK) that will operate on Microsoft Windows as well as OS X and Linux, and a message-passing-based cluster (MPI), whichever is best suited to our circumstances. Secondly, the overhead of creating and configuring the cluster (we will visit the R configuration of the cluster in a later chapter) is amortized as it can be continually reused within our session.

The PSOCK and MPI types of cluster also enable R to utilize multiple machines within a network and perform true distributed computing (the machines may also be running different operating systems). However, for now, we will focus on the PSOCK cluster type and how this can be utilized within a single machine context. We will explore MPI in detail in Chapter 2, Introduction to MessagePassing, Chapter 3, Advanced Message Passing, Chapter 4, Developing SPRINT an MPI-based Package for Supercomputers.

Let's jump right in; run the following:

> cluster <- makeCluster(detectCores(),"PSOCK")

> tri <- generateTriples()
> tasks <- list(tri[[1]],tri[[21]],tri[[41]],tri[[61]])
> teval(parLapply(cluster,tasks,solver))
$Duration               ## Overall
   user  system elapsed 
  0.119   0.148  83.820
$Result[[1]]$Duration   ## Triple "011819"
   user  system elapsed 
  2.055   0.008   2.118
$Result[[2]]$Duration   ## Triple "061517"
   user  system elapsed 
 55.603   0.156  56.749 
$Result[[3]]$Duration   ## Triple "091019"
   user  system elapsed 
 81.949   0.208  83.195 
$Result[[4]]$Duration   ## Triple "111215"
   user  system elapsed 
 82.591   0.196  83.788

> stopCluster(cluster)  ## Shutdown the cluster (reap processes) 

What you may immediately notice from the timing results generated before is that the overall user time is recorded as negligible. This is because in the launching process, your main R session (referred to as the master) does not perform any of the computation, and all of the computation is carried out by the cluster. The master merely has to send out the tasks to the cluster and wait for the results to return.

What's also particularly apparent when running a cluster in this mode is the imbalance in computation across the processes in the cluster (referred to as workers). As the following image demonstrates very clearly, each R worker process in the cluster computed a single task in variable time, and the PID 41527 process sat idle after just two seconds while the PID 41551 process was busy still computing its task for a further 1m 20s:

While increasing the number of tasks for the cluster to perform and assuming a random assignment of tasks to workers should increase efficiency, we still could end up with a less-than optimal overall utilization of resource. What we need is something more adaptive that hands out tasks dynamically to worker processes whenever they are next free to do more work. Luckily for us, there is a variation on parLapply() that does just this….

Note

Other parApply functions

There is a whole family of cluster functions to suit different types of workload, such as processing R matrices in parallel. These are summarized briefly here:

  • parSapply(): This is the parallel variant of sapply() that simplifies the return type (if possible) to a choice of vector, matrix, or array.

  • parCapply(), parRapply(): These are the parallel operations that respectively apply to the columns and rows of a matrix.

  • parLapplyLB(), parSapplyLB(): These are the load-balancing versions of their similarly named cousins. Load balancing is discussed in the next section.

  • clusterApply(), clusterApplyLB(): These are generic apply and load-balanced apply that are utilized by all the parApply functions. These are discussed in the next section.

  • clusterMap(): This is a parallel variant of mapply()/map() enabling a function to be invoked with separate parameter values for each task, with an optional simplification of the return type (such as sapply()).

More information is available by typing help(clusterApply) in R. Our focus in this chapter will remain on processing a list of tasks.

Parallel load balancing

The parLapplyLB() function is a load-balancing variant of parLapply(). Both these functions are essentially lightweight wrappers that internally make use of the directly callable parallel package functions clusterApplyLB() and clusterApply(), respectively. However, it is important to understand that the parLapply functions split the list of tasks into a number of equal-sized subsets matching the number of workers in the cluster before invoking the associated clusterApply function.

If you call clusterApply() directly, it will simply process the list of tasks presented in blocks of cluster size—that is, the number of workers in the cluster. It does this in a sequential order, so assuming there are four workers, then task 1 will go to worker 1, task 2 to worker 2, task 3 to worker 3, and task 4 to worker 4 and then 5 will go to worker 1, task 6 to worker 2, and so on. However, it is worth noting that clusterApply() also waits between each block of tasks for all the tasks in this block to complete before moving on to the next block.

This has important performance implications, as we can note in the following code snippet. In this example, we will use a particular subset (16) of the 90 tile-triples to demonstrate the point:

> cluster <- makeCluster(4,"PSOCK")
> tri <- generateTriples()
> triples <- list(tri[[1]],tri[[20]],tri[[70]],tri[[85]],
                  tri[[2]],tri[[21]],tri[[71]],tri[[86]],
                  tri[[3]],tri[[22]],tri[[72]],tri[[87]],
                  tri[[4]],tri[[23]],tri[[73]],tri[[88]])
> teval(clusterApply(cluster,triples,solver))
$Duration
   user  system elapsed 
  0.613   0.778 449.873
> stopCluster(cluster)

What the preceding results illustrate is that because of the variation in compute time per task, workers are left waiting for the longest task in a block to complete before they are all assigned their next task to compute. If you watch the process utilization during execution, you will see this behavior as the lightest loaded process, in particular, briefly bursts into life at the start of each of the four blocks. This scenario is particularly inefficient and can lead to significantly extended runtimes and, in the worst case, potentially no particular advantage running in parallel compared to running in serial. Notably, parLapply() avoids invoking this behavior because it first splits the available tasks into exactly cluster-sized lapply() metatasks, and clusterApply() only operates on a single block of tasks then. However, a poor balance of work across this initial split will still affect the parLapply function's overall performance.

By comparison, clusterApplyLB() distributes one task at a time per worker, and whenever a worker completes its task, it immediately hands out the next task to the first available worker. There is some extra overhead to manage this procedure due to increased communication and workers still potentially queuing to wait on their next task to be assigned if they collectively finish their previous task at the same point in time. It is only, therefore, appropriate where there is considerable variation in computation for each task, and most of the tasks take some nontrivial period of time to compute.

Using clusterApplyLB() in our running example leads to an improvement in overall runtime (around 10%), with significantly improved utilization across all worker processes, as follows:

> cluster <- makeCluster(4,"PSOCK")
> teval(clusterApplyLB(cluster,triples,solver))
$Duration
   user  system elapsed 
  0.586   0.841 421.859
> stopCluster(cluster)

The final point to highlight here is that the a priori balancing of a distributed workload is the most efficient option when it is possible to do so. For our running example, executing the selected 16 triples in the order they are listed in with parLapply() results in the shortest overall runtime, beating clusterApplyLB() by 10 seconds and indicating that the load balancing equates to around a 3% overhead. The order of the selected triples happens to align perfectly with the parLapply function's packaging of tasks across the four-worker cluster. However, this is an artificially constructed scenario, and for the full tile-triple variable task workload, employing dynamic load balancing is the best option.