--- title: "Stat 3701 Lecture Notes: Parallel Computing in R" author: "Charles J. Geyer" date: "`r format(Sys.time(), '%B %d, %Y')`" output: html_document: number_sections: true mathjax: "https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.0/MathJax.js?config=TeX-AMS-MML_HTMLorMML" pdf_document: number_sections: true --- # License This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License (http://creativecommons.org/licenses/by-sa/4.0/). # Note These notes have not been kept up to date. The best version of my class notes for parallel computing are [those for Stat 8054](http://www.stat.umn.edu/geyer/8054/notes/parallel.html) (PhD level statistical computing). Those notes say more or less the same as these but have many corrections (the code for LATIS actually works). They are, however, terser. Also the [section about the compiler](#r-package-compiler) below should probably be ignored. It was written before the just-in-time compiler was turned on by default in R version 3.4.0. Now all code whether precompiled or not should get most of the speed-up possible with no special action taken by users. There is nothing wrong with what the section below says; it just isn't necessary anymore. # R * The version of R used to make this document is `r getRversion()`. * The version of the `rmarkdown` package used to make this document is `r packageVersion("rmarkdown")`. # Computer History The first computer I ever worked on was an IBM 1130 ([Wikipedia page](https://en.wikipedia.org/wiki/IBM_1130)). This was in 1971. It was the only computer the small liberal arts college I went to had. It had 16 bit words and 32,768 of them (64 kilobytes) of memory. The clock speed was 278 kHz (kilohertz). For comparison, the laptop I am working on has 64 bit words and 8054820 kB (8 GB, which is 125,000 times as much as the IBM 1130). Its clock speed is 2.50GHz (8,993 times as fast). That is `log2(8993)` = `r round(log2(8993), 2)` doublings of speed in 46 years, which is a doubling of computer speed every `r round(46 / log2(8993), 1)` years. For a very long time (going back to even before 1970), computers have been getting faster at more or less this rate, but something happened in 2003 as discussed in the article [The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software](http://www.gotw.ca/publications/concurrency-ddj.htm). This exponential growth in computer speed was often confused with Moore's Law ([Wikipedia article](https://en.wikipedia.org/wiki/Moore%27s_law)), which actually predicts a doubling of the *number of transistors* on a computer chip every 2 years. For a long time this went hand in hand with a doubling in computer speed, but about 2003 the speed increases stopped while the number of transistors kept doubling. What to do with all of those transistors? Make more processors. So even the dumbest chips today, say the one in your phone, has multiple "cores", each a real computer. High end graphics cards, so-called GPU's, can run thousands of processes simultaneously, but have a very limited instruction set. They can only run specialized graphics code very fast. They could not run 1000 instances of R very fast. Your laptop, in contrast, can run multiple instances of R very fast, just not so many. The laptop I use for class has an Intel i5 chip with 4 cores that can execute 4 separate processes at full machine speed. The desktop in my office has an Intel i7 chip that has 4 cores each of which can handle 2 "hyperthreads" so it can execute 8 separate processes at (nearly) full machine speed. Big servers can handle even more parallel processes. Compute clusters like at the U of M supercomputer center or at CLA research computing can handle even more parallel processes. In summary, you get faster today by running more processes in parallel not by running faster on one processor. # Task View The [task view on high performance computing](https://cran.r-project.org/web/views/HighPerformanceComputing.html) includes discussion of parallel processing (since that is what high performance computing is all about these days). But, somewhat crazily, the task view does not discuss the most important R package of all for parallel computing. That is R package `parallel` in the R base (the part of R that must be installed in each R installation). This is the only R package for high performance computing that we are going to use in this course. # An Example ## Introduction The example that we will use throughout this document is simulating the sampling distribution of the MLE for $\text{Normal}(\theta, \theta^2)$ data. This is the same as the example of [Section 5.3 of the course notes on simulation](http://www.stat.umn.edu/geyer/3701/notes/simulation.html#doing-the-simulation), except we are going to simplify the R function `estimator` using some ideas from later on in the notes ([Section 6.2.4 of the course notes on the bootstrap](http://www.stat.umn.edu/geyer/3701/notes/bootstrap.html#bootstrap-gamma-mle)). ## Set-Up ```{r} n <- 10 nsim <- 1e4 theta <- 1 doit <- function(estimator, seed = 42) { set.seed(seed) result <- double(nsim) for (i in 1:nsim) { x <- rnorm(n, theta, abs(theta)) result[i] <- estimator(x) } return(result) } mlogl <- function(theta, x) sum(- dnorm(x, theta, abs(theta), log = TRUE)) mle <- function(x) { if (all(x == 0)) return(0) nout <- nlm(mlogl, sign(mean(x)) * sd(x), x = x) while (nout$code > 3) nout <- nlm(mlogl, nout$estimate, x = x) return(nout$estimate) } ``` ## Try It ```{r} theta.hat <- doit(mle) ``` ## Check It ```{r fig.align='center'} hist(theta.hat, probability = TRUE, breaks = 30) curve(dnorm(x, mean = theta, sd = theta / sqrt(3 * n)), add = TRUE) ``` The curve is the PDF of the asymptotic normal distribution of the MLE, which uses the formula $$ I_n(\theta) = \frac{3 n}{\theta^2} $$ which isn't in these course notes (although we did calculate Fisher information for any given numerical value of $\theta$ in the practice problems solutions cited above). Looks pretty good. The large negative estimates are probably not a mistake. The parameter is allowed to be negative, so sometimes the estimates come out negative even though the truth is positive. And not just a little negative because $\lvert \theta \rvert$ is also the standard deviation, so it cannot be small and the model fit the data. ## Time It Now for something new. We will time it. ```{r cache=TRUE} time1 <- system.time(theta.hat.mle <- doit(mle)) time1 ``` ## Time It More Accurately That's too short a time for accurate timing. Also we should probably average over several IID iterations to get a good average. Try again. ```{r cache=TRUE} nsim <- 1e5 nrep <- 7 time1 <- NULL for (irep in 1:nrep) time1 <- rbind(time1, system.time(theta.hat.mle <- doit(mle))) time1 apply(time1, 2, mean) apply(time1, 2, sd) / sqrt(nrep) ``` # R Package `compiler` ## The Compiler Also very important for speed is the R package `compiler` which is also in the R base (the part of R that must be installed in each R installation). Compiling R is not like compiling C, C++, or Java or other so-called "compiled" languages. Compiled R works just the same as non-compiled R. You don't even notice it. The only difference is that it runs approximately twice as fast. Since version 2.14.0 of R (31 Oct 2011) all base and recommended packages are compiled during the installation process. Any code you write yourself is not compiled unless you do it explicitly, as in [Section 6.4 below](#example-with-compilation). Since version 3.4.0 of R (20 Apr 2017) the just-in-time compiler (JIT) is turned on by default at its highest level. This means * larger closures (functions) are compiled before their first use, * some small closures are also compiled before their second use, and * all top level loops are compiled before they are executed. And all of this happens automatically with nothing being done by the user except using R as always. So using versions 3.4.0 or higher may make explicit use of the compiler unnecessary. ## Compiling Our Functions Now compile all of our functions and see if this helps. ```{r} library(compiler) mlogl mlogl <- cmpfun(mlogl) mlogl mlogl(1.4, rnorm(6)) ``` After compilation functions look the same except for extra blurfle about byte code. And they work the same. So compile the rest. ```{r} doit <- cmpfun(doit) mle <- cmpfun(mle) ``` ## Example With Compilation The following code chunk is identical to the code chunk in [Section 6.2 above](#example-without-compilation) except for changing `time1` to `time2` everywhere. ```{r cache=TRUE} time2 <- NULL for (irep in 1:nrep) time2 <- rbind(time2, system.time(theta.hat.mle <- doit(mle))) time2 apply(time2, 2, mean) apply(time2, 2, sd) / sqrt(nrep) ``` It is actually slower when compiled! But not statistically significantly so. ```{r} t.test(time1[ , 1], time2[ , 1]) ``` What happened? Most of the time is spent in the R function `nlm` which is mostly C code. So it has always run at C speed (as fast as the computer can go) even before R had a compiler. So that means there is not much for the compiler to do. Or so I hypothesize. ## Profiling the Example We should have done this first (see also [Section 6.5](#the-three-rules-of-optimiztion) below). ```{r cache=TRUE} time3 <- NULL Rprof() for (irep in 1:nrep) time3 <- rbind(time3, system.time(theta.hat.mle <- doit(mle))) Rprof(NULL) time3 apply(time3, 2, mean) apply(time3, 2, sd) / sqrt(nrep) summaryRprof() ``` ```{r include=FALSE} sout <- summaryRprof() ``` This is a little hard to read. Only the first table (the `by.self` component of the output) is important. The row labels are function names, some of which we recognize like ``` dnorm nlm rnorm mean estimator doit ``` and some we do not recognize like ``` f .External2 mean.default ``` So our first task is to understand the ones we do not recognize. To do that we need to be very fussy about our language, distinguishing between R objects and *names* of R objects. We think of `mlogl` as an R function, but it is not. It is the *name* of an R function that can have various names. ```{r} fred <- sally <- herman <- alice <- mlogl ``` Now it has five different names, but there is only one R object that those five names refer to. When a function is an argument to another function (which makes the latter a "higher-order function") it gets a new name inside the latter. That's how function arguments work for all kinds of objects. ```{r} args(nlm) ``` The first argument of `nlm` is called `f`. So when we call `nlm` with its first argument being `mlogl`, the function that is *named* `mlogl` outside of `nlm` is *named* `f` inside `nlm`. So now we understand what `f` is. It is the same R object as `mlogl`. When this function is called over and over again inside `mlogl` (because it needs to evaluate the objective function at many points to minimize it), it is named `f` because that is what the first argument of `nlm` is *named*. To explain some of the rest we need to look at all of the code of `nlm` ```{r} nlm ``` There we see `.External2` which the documentation `help(".External2")` is clear as mud. It says that `.External2` is just like `.External` but fancier. So we look at `help(".External")` the "Description" of which says > Functions to pass R objects to compiled C/C++ code that has been > loaded into R. So we see this call to `.External2` is the end of R and the start of C or C++ (and when that C or C++ function returns, it returns to R). So there is nothing the R compiler can do to speed up `.External2`. Only the C or C++ compiler can speed that that up (by using higher optimization level or a newer version of the compiler with more tricks). The first argument of `.External2` is an R symbol (also called *name*) that refers to the C or C++ function being called. The second argument is an anonymous function ``` function(x) f(x, ...) ``` the point of which is to capture the `...` arguments so the C or C++ code does not need to deal with them. When the function *named* `f` (also *named* `mlogl`) is called, it is called with the `...` arguments passed to it. When we call `nlm` we call it as follows ``` nlm(mlogl, mean(x), x = x) ``` with two *positional arguments, which match the first two arguments of `nlm` (called `f` and `p` inside `nlm`) and one named argument *named* `x`. Since no argument *name* of `nlm` that comes before `...` starts with `x` and no argument that comes after `...` is exactly `x` (arguments after `...` must be matched exactly), our `x` becomes a `...` argument *named* `x`. And then when the *anonymous* function ``` function(x) f(x, ...) ``` is called, it is has only one argument (also called `x`) that is passed as a *positional* argument to `f` and so matches the first argument to `f` which is ```{r} args(mlogl) ``` `theta` (so what is called `x` in the anonymous function is called `theta` in the function it calls) and that leaves the second argument which is a *named* argument *named* `x`. So everything works. All of that was, no doubt, *way more that you wanted to know* but necessary to disentangle what is going on. Here is what happens. * We call `nlm`. * It calls `.External2`. * Now we are in C or C++ rather than in R. Every time this C or C++ code needs to evaluate the objective function, it evaluates the R function which was the second argument to `.External2`, which is the *anonymous function* ``` function(x) f(x, ...) ``` which calls the function that it names `f` but we think of as `mlogl`. All of the above was touched on in [Section 7 of the course notes on "basics"](http://www.stat.umn.edu/geyer/3701/notes/basic.pdf#page=46), but we see it can get very messy when one tries to figure out *exactly* what is going on even in fairly simple code. Finally, `mean` is a generic function and `mean.default` is one of its methods, the one that is used for objects that no other method of `mean` handles. Generic functions, we haven't really covered yet. (Now there are [course notes on that subject](http://www.stat.umn.edu/geyer/3701/notes/generic.html).) Suffice it to say that `mean.default` is the *name* of a function that is called when we call `mean`. All of that just to explain the names! Then we want to look at the column of the table labeled `self.pct` which is the percentage of the computer time taken by the function itself as opposed to other functions it calls. So IMHO the only part of the output worth looking at is summarized in the following table function name self.pct --------------------------------- ------------------------------------------------ `dnorm` `r sout$by.self["\"dnorm\"", "self.pct"]` `f` (a. k. a. `mlogl`) `r sout$by.self["\"f\"", "self.pct"]` `.External2` `r sout$by.self["\".External2\"", "self.pct"]` anonymous function that calls `f` `r sout$by.self["\"\"", "self.pct"]` `nlm` `r sout$by.self["\"nlm\"", "self.pct"]` `rnorm` `r sout$by.self["\"rnorm\"", "self.pct"]` `mean` `r sout$by.self["\"mean\"", "self.pct"]` `estimator` `r sout$by.self["\"estimator\"", "self.pct"]` --------------------------------- ------------------------------------------ So `r sout$by.self["\"dnorm\"", "self.pct"]`% of the computing time is taken up by, `dnorm`, which the compiler cannot speed up because `.Call` is (if we look at its help) another way to go from R to C or C++. So `dnorm` is almost entirely C code, which the R compiler cannot speed up. Then another `r sout$by.self["\".External2\"", "self.pct"]`% of the computing time is is taken by `.External2` which the R compiler also cannot speed up. Since the anonymous function that calls `f` a. k. a. `mlogl` does almost nothing, it cannot be speeded up much. I don't understand and cannot explain how it takes up `r sout$by.self["\"\"", "self.pct"]`% of the computer time when there is almost nothing there. Our compiling could conceivably speed up `f` (a. k. a. `mlogl`) `estimator` (a. k. a. `mle`) and `doit` but they take hardly any of the computing time. The only part we could actually (possibly) speed up by writing better R would be the time spent in `f`, which takes up only `r sout$by.self["\"f\"", "self.pct"]`% of the computer time. ## The Three Rules of Optimization Actually, we should have read this first: the [Three Rules of Optimization](http://wiki.c2.com/?RulesOfOptimization). # Parallel Computing ## With Unix Fork and Exec This method is by far the simplest but * it only works on one computer (using however many simultaneous processes the computer can do), and * it does not work on Windows. First a toy problem that does nothing except show that we are actually using different processes. ```{r} library(parallel) ncores <- detectCores() mclapply(1:ncores, function(x) Sys.getpid(), mc.cores = ncores) ``` ### Parallel Streams of Random Numbers #### Try 1 If we generate random numbers reproducibly, it does not work using the default RNG. ```{r} set.seed(42) mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores) set.seed(42) mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores) ``` We don't have reproducibility. #### Try 2 ```{r} set.seed(42) mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores, mc.set.seed = FALSE) set.seed(42) mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores, mc.set.seed = FALSE) ``` We have reproducibility, but we don't have different random number streams for the different processes. #### Try 3 ```{r} RNGkind("L'Ecuyer-CMRG") set.seed(42) mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores) set.seed(42) mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores) ``` Just right! We have different random numbers in all our jobs. And it is reproducible. #### Try 4 But this does not work like you may think it does. ```{r} save.seed <- .Random.seed mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores) identical(save.seed, .Random.seed) ``` Running `mclapply` does not change `.Random.seed` in the parent process (the R process you are typing into). It only changes it in the child processes (that do the work). But there is no communication from child to parent *except* the list of results returned by `mclapply`. This is a fundamental problem with `mclapply` and the fork-exec method of parallelization. And it has no real solution. The different child processes are using different random number streams (we see that, and it is what we wanted to happen). So they should all have a different `.Random.seed` at the end. Let's check. ```{r} fred <- function(x) { sally <- rnorm(5) list(normals = sally, seeds = .Random.seed) } mclapply(1:ncores, fred, mc.cores = ncores) ``` Right! Conceptually, there is no Right Thing to do! We want to advance the RNG seed in the parent process, but to what? We have four different possibilities (with four child processes), but we only want one answer, not four! So the only solution to this problem is not really a solution. You just have to be aware of the issue. If you want to do exactly the same random thing with `mclapply` and get different random results, then you must change `.Random.seed` in the parent process, either with `set.seed` or by otherwise using random numbers *in the parent process*. ### The Example {#fork-example} We need to rewrite our `doit` function * to only do `1 / ncores` of the work in each child process, * to not set the random number generator seed, and * to take an argument in some list we provide. ```{r} doit <- function(nsim, estimator) { result <- double(nsim) for (i in 1:nsim) { x <- rnorm(n, theta, abs(theta)) result[i] <- estimator(x) } return(result) } ``` ### Try It {#fork-try} ```{r cache=TRUE} mout <- mclapply(rep(nsim / ncores, ncores), doit, estimator = mle, mc.cores = ncores) lapply(mout, head) ``` ### Check It {#fork-check} Seems to have worked. ```{r} length(mout) sapply(mout, length) lapply(mout, head) lapply(mout, range) ``` Plot it. ```{r fig.align='center'} theta.hat <- unlist(mout) hist(theta.hat, probability = TRUE, breaks = 30) curve(dnorm(x, mean = theta, sd = theta / sqrt(3 * n)), add = TRUE) ``` ### Time It {#fork-time} ```{r cache=TRUE} time4 <- NULL for (irep in 1:nrep) time4 <- rbind(time4, system.time(theta.hat.mle <- unlist(mclapply(rep(nsim / ncores, ncores), doit, estimator = mle, mc.cores = ncores)))) time4 apply(time4, 2, mean) apply(time4, 2, sd) / sqrt(nrep) ``` We got the desired speedup. The elapsed time averages ```{r} apply(time4, 2, mean)["elapsed"] ``` with parallelization and ```{r} apply(time2, 2, mean)["elapsed"] ``` without parallelization. But we did not get a 4-fold speedup with 4 cores. There is a cost to starting and stopping the child processes. And some time needs to be taken from this number crunching to run the rest of the computer. However, we did get an almost 3-fold speedup. If we had more cores, we could do even better. ## The Example With a Cluster This method is more complicated but * it works on clusters like the ones at the [Minnesota Supercomputing Institute](https://www.msi.umn.edu/) or at [LATIS (College of Liberal Arts Technologies and Innovation Services](http://z.umn.edu/claresearchcomputing), and * according to the documentation, it does work on Windows. First a toy problem that does nothing except show that we are actually using different processes. ```{r} library(parallel) ncores <- detectCores() cl <- makePSOCKcluster(ncores) parLapply(cl, 1:ncores, function(x) Sys.getpid()) stopCluster(cl) ``` This is more complicated in that * first you you set up a cluster, here with `makePSOCKcluster` but not everywhere --- there are a variety of different commands to make clusters and the command would be different at MSI or LATIS --- and * at the end you tear down the cluster with `stopCluster`. Of course, you do not need to tear down the cluster before you are done with it. You can execute multiple `parLapply` commands on the same cluster. There are also a lot of other commands other than `parLapply` that can be used on the cluster. We will see some of them below. ### Parallel Streams of Random Numbers {#rng-cluster} ```{r} cl <- makePSOCKcluster(ncores) clusterSetRNGStream(cl, 42) parLapply(cl, 1:ncores, function(x) rnorm(5)) parLapply(cl, 1:ncores, function(x) rnorm(5)) ``` We see that clusters do not have the same problem with continuing random number streams that the fork-exec mechanism has. * Using fork-exec there is a *parent* process and *child* processes (all running on the same computer) and the *child* processes end when their work is done (when `mclapply` finishes). * Using clusters there is a *controller* process and *worker* processes (possibly running on many different computers) and the *worker* processes end when the cluster is torn down (with `stopCluster`). So the worker processes continue and remember where they are in the random number stream. ### The Example on a Cluster #### Set Up {#cluster-setup} Another complication of using clusters is that the worker processes are completely independent of the controller process. Any information they have must be explicitly passed to them. This is very unlike the fork-exec model in which all of the child processes are copies of the parent process inheriting all of its memory (and thus knowing about any and all R objects it created). So in order for our example to work we must explicitly distribute stuff to the cluster. ```{r} clusterExport(cl, c("doit", "mle", "mlogl", "n", "nsim", "theta")) ``` Now all of the workers have those R objects, as copied from the controller process right now. If we change them in the controller (pedantically if we change the R objects those *names* refer to) the workers won't know about it. They only would get access to those changes if code were executed on them to do so. #### Try It {#cluster-try} So now we are set up to try our example. ```{r cache=TRUE} pout <- parLapply(cl, rep(nsim / ncores, ncores), doit, estimator = mle) ``` #### Check It {#cluster-check} Seems to have worked. ```{r} length(pout) sapply(pout, length) lapply(pout, head) lapply(pout, range) ``` Plot it. ```{r fig.align='center'} theta.hat <- unlist(mout) hist(theta.hat, probability = TRUE, breaks = 30) curve(dnorm(x, mean = theta, sd = theta / sqrt(3 * n)), add = TRUE) ``` #### Time It {#cluster-time} ```{r cache=TRUE} time5 <- NULL for (irep in 1:nrep) time5 <- rbind(time5, system.time(theta.hat.mle <- unlist(parLapply(cl, rep(nsim / ncores, ncores), doit, estimator = mle)))) time5 apply(time5, 2, mean) apply(time5, 2, sd) / sqrt(nrep) ``` We got the desired speedup. The elapsed time averages ```{r} apply(time5, 2, mean)["elapsed"] ``` with parallelization and ```{r} apply(time2, 2, mean)["elapsed"] ``` without parallelization. But we did not get a 4-fold speedup with 4 cores. There is a cost to sending information to and from the worker processes. And some time needs to be taken from this number crunching to run the rest of the computer. However, we did get a 2-fold speedup. If we had more workers, we could do even better. ### Tear Down Don't forget to tear down the cluster when you are done. ```{r} stopCluster(cl) ```