This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License (http://creativecommons.org/licenses/by-sa/4.0/).
These notes are the course material for the undergraduate statistical computing course Stat 3701. The best version of my class notes for parallel computing are those for Stat 8054 (PhD level statistical computing). They are, however, terser.
The version of R used to make this document is 4.2.1.
The version of the rmarkdown
package used to make this document is 2.17.
The first computer I ever worked on was an IBM 1130 (Wikipedia page). This was in 1971. It was the only computer the small liberal arts college I went to had.
It had 16 bit words and 32,768 of them (64 kilobytes) of memory. The clock speed was 278 kHz (kilohertz).
For comparison, the laptop I am working on has 64 bit words and 16093280 kB (8 GB, which is 250,000 times as much as the IBM 1130). Its clock speed is 2.40GHz (8,633 times as fast).
That is log2(8633)
= 13.08 doublings of speed in 51 years, which is a doubling of computer speed every 3.9 years.
For a very long time (going back to even before 1970), computers have been getting faster at more or less this rate, but something happened in 2003 as discussed in the article The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software.
This exponential growth in computer speed was often confused with Moore’s Law (Wikipedia article), which actually predicts a doubling of the number of transistors on a computer chip every 2 years. For a long time this went hand in hand with a doubling in computer speed, but about 2003 the speed increases stopped while the number of transistors kept doubling.
What to do with all of those transistors? Make more processors. So even the dumbest chips today, say the one in your phone, has multiple “cores”, each a real computer. High end graphics cards, so-called GPU’s, can run thousands of processes simultaneously, but have a very limited instruction set. They can only run specialized graphics code very fast. They could not run 1000 instances of R very fast. Your laptop, in contrast, can run multiple instances of R very fast, just not so many. The laptop I use for class has an Intel i5 chip with 4 cores that can execute 8 separate processes (two hyperthreads per core) at (nearly) full machine speed. The desktop in my office has an Intel i7 which is similar (8 hyperthreads) but faster. Compute clusters like at the U of M supercomputer center or at CLA research computing can handle even more parallel processes.
In summary, you get faster today by running more processes in parallel not by running faster on one processor.
The task view on high performance computing includes discussion of parallel processing (since that is what high performance computing is all about these days).
But, somewhat crazily, the task view does not discuss the most important R package of all for parallel computing. That is R package parallel
in the R base (the part of R that must be installed in each R installation).
This is the only R package for high performance computing that we are going to use in this course.
The example that we will use throughout this document is simulating the sampling distribution of the MLE for \(\text{Normal}(\theta, \theta^2)\) data.
This is the same as the example of Section 5.3 of the course notes on simulation, except we are going to simplify the R function estimator
using some ideas from later on in the notes (Section 6.2.4 of the course notes on the bootstrap).
n <- 10
nsim <- 1e4
theta <- 1
doit <- function(estimator, seed = 42) {
set.seed(seed)
result <- double(nsim)
for (i in 1:nsim) {
x <- rnorm(n, theta, abs(theta))
result[i] <- estimator(x)
}
return(result)
}
mlogl <- function(theta, x) sum(- dnorm(x, theta, abs(theta), log = TRUE))
mle <- function(x) {
if (all(x == 0))
return(0)
nout <- nlm(mlogl, sign(mean(x)) * sd(x), x = x)
while (nout$code > 3)
nout <- nlm(mlogl, nout$estimate, x = x)
return(nout$estimate)
}
theta.hat <- doit(mle)
hist(theta.hat, probability = TRUE, breaks = 30)
curve(dnorm(x, mean = theta, sd = theta / sqrt(3 * n)), add = TRUE)
The curve is the PDF of the asymptotic normal distribution of the MLE, which uses the formula \[ I_n(\theta) = \frac{3 n}{\theta^2} \] which isn’t in these course notes (although we did calculate Fisher information for any given numerical value of \(\theta\) in the practice problems solutions cited above).
Looks pretty good. The large negative estimates are probably not a mistake. The parameter is allowed to be negative, so sometimes the estimates come out negative even though the truth is positive. And not just a little negative because \(\lvert \theta \rvert\) is also the standard deviation, so it cannot be small and the model fit the data.
Now for something new. We will time it.
time1 <- system.time(theta.hat.mle <- doit(mle))
time1
## user system elapsed
## 0.776 0.004 0.780
That’s too short a time for accurate timing. Also we should probably average over several IID iterations to get a good average. Try again.
nsim <- 1e5
nrep <- 7
time1 <- NULL
for (irep in 1:nrep)
time1 <- rbind(time1, system.time(theta.hat.mle <- doit(mle)))
time1
## user.self sys.self elapsed user.child sys.child
## [1,] 8.022 0.000 8.023 0 0
## [2,] 8.137 0.004 8.142 0 0
## [3,] 7.995 0.000 7.995 0 0
## [4,] 8.101 0.000 8.101 0 0
## [5,] 7.955 0.004 7.960 0 0
## [6,] 8.147 0.008 8.155 0 0
## [7,] 7.961 0.008 7.969 0 0
apply(time1, 2, mean)
## user.self sys.self elapsed user.child sys.child
## 8.045428571 0.003428571 8.049285714 0.000000000 0.000000000
apply(time1, 2, sd) / sqrt(nrep)
## user.self sys.self elapsed user.child sys.child
## 0.030932998 0.001360272 0.031053498 0.000000000 0.000000000
This method is by far the simplest but
it only works on one computer (using however many simultaneous processes the computer can do), and
it does not work on Windows unless you use it with no parallelization, optional argument mc.cores = 1
or unless you are running R under Windows Subsystem for Linux (WSL), which is a complete implementation of Linux running inside Microsoft Windows.
If you want to do this example on Windows without WSL, use mc.cores = 1
.
First a toy problem that does nothing except show that we are actually using different processes.
library(parallel)
ncores <- detectCores()
mclapply(1:ncores, function(x) Sys.getpid(), mc.cores = ncores)
## [[1]]
## [1] 317127
##
## [[2]]
## [1] 317128
##
## [[3]]
## [1] 317129
##
## [[4]]
## [1] 317130
##
## [[5]]
## [1] 317131
##
## [[6]]
## [1] 317132
##
## [[7]]
## [1] 317133
##
## [[8]]
## [1] 317134
If we generate random numbers reproducibly, it does not work using the default RNG.
set.seed(42)
mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores)
## [[1]]
## [1] -2.5072855 1.0768257 -0.9640853 -0.4889282 -0.4784051
##
## [[2]]
## [1] 1.0141533 -0.4824945 0.9510594 -1.8611083 -0.1773298
##
## [[3]]
## [1] 0.52918698 -0.08431874 -0.27203749 1.62583703 0.84699828
##
## [[4]]
## [1] 0.9071551 1.1054087 -0.4088936 -1.3364417 0.6831682
##
## [[5]]
## [1] 1.5371944 1.7292377 -0.4259214 0.3544867 -0.3337712
##
## [[6]]
## [1] 1.58467325 -0.01072021 -0.02868569 0.72384608 -0.85994008
##
## [[7]]
## [1] 0.2338911 0.5328038 0.2674830 0.9844909 -2.4303119
##
## [[8]]
## [1] 1.115838270 0.042179357 0.909245496 1.533407927 0.009492357
set.seed(42)
mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores)
## [[1]]
## [1] 0.05731454 0.53728041 -0.88177250 -0.01071948 -0.74696258
##
## [[2]]
## [1] -0.43298008 -0.95881597 -0.09941596 0.58326730 -0.55243605
##
## [[3]]
## [1] -1.3320991 0.5909732 0.8353615 1.1044205 -0.1386556
##
## [[4]]
## [1] -0.6040820 -1.7552367 -0.3686439 0.9844289 0.8072681
##
## [[5]]
## [1] 0.01011204 -0.02109631 -0.68504941 -0.70841407 0.99787371
##
## [[6]]
## [1] 1.0768176 2.0767737 1.1300614 -0.4061605 -0.3263099
##
## [[7]]
## [1] 0.54584912 1.46049341 0.06536747 -1.81814060 -0.32782680
##
## [[8]]
## [1] -0.4535918 0.5457451 -0.4279881 1.0149696 1.1381390
We don’t have reproducibility.
set.seed(42)
mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores, mc.set.seed = FALSE)
## [[1]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[2]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[3]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[4]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[5]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[6]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[7]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[8]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
set.seed(42)
mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores, mc.set.seed = FALSE)
## [[1]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[2]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[3]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[4]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[5]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[6]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[7]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
##
## [[8]]
## [1] 1.3709584 -0.5646982 0.3631284 0.6328626 0.4042683
We have reproducibility, but we don’t have different random number streams for the different processes.
RNGkind("L'Ecuyer-CMRG")
set.seed(42)
mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores)
## [[1]]
## [1] 1.11932846 -0.07617141 -0.35021912 -0.33491161 -1.73311280
##
## [[2]]
## [1] -0.2084809 -1.0341493 -0.2629060 0.3880115 0.8331067
##
## [[3]]
## [1] 0.001100034 1.763058291 -0.166377859 -0.311947389 0.694879494
##
## [[4]]
## [1] 0.2262605 -0.4827515 1.7637105 -0.1887217 -0.7998982
##
## [[5]]
## [1] 0.8584220 -0.3851236 1.0817530 0.2851169 0.1799325
##
## [[6]]
## [1] -1.1378621 -1.5197576 -0.9198612 1.0303683 -0.9458347
##
## [[7]]
## [1] -0.04649149 3.38053730 -0.35705061 0.17722940 -0.39716405
##
## [[8]]
## [1] 1.3502819 -1.0055894 -0.4591798 -0.0628527 -0.2706805
set.seed(42)
mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores)
## [[1]]
## [1] 1.11932846 -0.07617141 -0.35021912 -0.33491161 -1.73311280
##
## [[2]]
## [1] -0.2084809 -1.0341493 -0.2629060 0.3880115 0.8331067
##
## [[3]]
## [1] 0.001100034 1.763058291 -0.166377859 -0.311947389 0.694879494
##
## [[4]]
## [1] 0.2262605 -0.4827515 1.7637105 -0.1887217 -0.7998982
##
## [[5]]
## [1] 0.8584220 -0.3851236 1.0817530 0.2851169 0.1799325
##
## [[6]]
## [1] -1.1378621 -1.5197576 -0.9198612 1.0303683 -0.9458347
##
## [[7]]
## [1] -0.04649149 3.38053730 -0.35705061 0.17722940 -0.39716405
##
## [[8]]
## [1] 1.3502819 -1.0055894 -0.4591798 -0.0628527 -0.2706805
Just right! We have different random numbers in all our jobs. And it is reproducible.
But this does not work like you may think it does.
save.seed <- .Random.seed
mclapply(1:ncores, function(x) rnorm(5), mc.cores = ncores)
## [[1]]
## [1] 1.11932846 -0.07617141 -0.35021912 -0.33491161 -1.73311280
##
## [[2]]
## [1] -0.2084809 -1.0341493 -0.2629060 0.3880115 0.8331067
##
## [[3]]
## [1] 0.001100034 1.763058291 -0.166377859 -0.311947389 0.694879494
##
## [[4]]
## [1] 0.2262605 -0.4827515 1.7637105 -0.1887217 -0.7998982
##
## [[5]]
## [1] 0.8584220 -0.3851236 1.0817530 0.2851169 0.1799325
##
## [[6]]
## [1] -1.1378621 -1.5197576 -0.9198612 1.0303683 -0.9458347
##
## [[7]]
## [1] -0.04649149 3.38053730 -0.35705061 0.17722940 -0.39716405
##
## [[8]]
## [1] 1.3502819 -1.0055894 -0.4591798 -0.0628527 -0.2706805
identical(save.seed, .Random.seed)
## [1] TRUE
Running mclapply
does not change .Random.seed
in the parent process (the R process you are typing into). It only changes it in the child processes (that do the work). But there is no communication from child to parent except the list of results returned by mclapply
.
This is a fundamental problem with mclapply
and the fork-exec method of parallelization. And it has no real solution. The different child processes are using different random number streams (we see that, and it is what we wanted to happen). So they should all have a different .Random.seed
at the end. Let’s check.
fred <- function(x) {
sally <- rnorm(5)
list(normals = sally, seeds = .Random.seed)
}
mclapply(1:ncores, fred, mc.cores = ncores)
## [[1]]
## [[1]]$normals
## [1] 1.11932846 -0.07617141 -0.35021912 -0.33491161 -1.73311280
##
## [[1]]$seeds
## [1] 10407 843426105 -1189635545 -1908310047 -500321376 -1368039072
## [7] -327247439
##
##
## [[2]]
## [[2]]$normals
## [1] -0.2084809 -1.0341493 -0.2629060 0.3880115 0.8331067
##
## [[2]]$seeds
## [1] 10407 -846419547 937862459 1326107580 760134144 1807130611 1335532618
##
##
## [[3]]
## [[3]]$normals
## [1] 0.001100034 1.763058291 -0.166377859 -0.311947389 0.694879494
##
## [[3]]$seeds
## [1] 10407 -133207412 1779373486 976163781 -458962600 -1469488387
## [7] -2147451017
##
##
## [[4]]
## [[4]]$normals
## [1] 0.2262605 -0.4827515 1.7637105 -0.1887217 -0.7998982
##
## [[4]]$seeds
## [1] 10407 -1395279340 -1740117305 -2068775159 -1223675294 1644811352
## [7] 970811783
##
##
## [[5]]
## [[5]]$normals
## [1] 0.8584220 -0.3851236 1.0817530 0.2851169 0.1799325
##
## [[5]]$seeds
## [1] 10407 -1988850249 851836302 -412123035 -1393827477 -1602296088
## [7] -1726579968
##
##
## [[6]]
## [[6]]$normals
## [1] -1.1378621 -1.5197576 -0.9198612 1.0303683 -0.9458347
##
## [[6]]$seeds
## [1] 10407 -289872396 -1983423440 -1372057278 1254597746 1572309401
## [7] -139829874
##
##
## [[7]]
## [[7]]$normals
## [1] -0.04649149 3.38053730 -0.35705061 0.17722940 -0.39716405
##
## [[7]]$seeds
## [1] 10407 -552026993 1357891868 -1020113790 1248945061 -126548789
## [7] -322082143
##
##
## [[8]]
## [[8]]$normals
## [1] 1.3502819 -1.0055894 -0.4591798 -0.0628527 -0.2706805
##
## [[8]]$seeds
## [1] 10407 -1867726870 1386420444 -153979515 -1662972776 -302869263
## [7] -198526739
Right! Conceptually, there is no Right Thing to do! We want to advance the RNG seed in the parent process, but to what? We have eight different possibilities (with eight child processes), but we only want one answer, not eight!
So the only solution to this problem is not really a solution. You just have to be aware of the issue. If you want to do exactly the same random thing with mclapply
and get different random results, then you must change .Random.seed
in the parent process, either with set.seed
or by otherwise using random numbers in the parent process.
We need to rewrite our doit
function
to only do 1 / ncores
of the work in each child process,
to not set the random number generator seed, and
to take an argument in some list we provide.
doit <- function(nsim, estimator) {
result <- double(nsim)
for (i in 1:nsim) {
x <- rnorm(n, theta, abs(theta))
result[i] <- estimator(x)
}
return(result)
}
mout <- mclapply(rep(nsim %/% ncores, ncores), doit,
estimator = mle, mc.cores = ncores)
lapply(mout, head)
## [[1]]
## [1] 0.9051972 0.9589889 0.9799828 1.1347548 0.9090886 0.9821320
##
## [[2]]
## [1] 0.8317815 1.3432331 0.7821308 1.2010078 0.9792244 1.1148521
##
## [[3]]
## [1] 0.8627829 0.9790400 1.1787975 0.7852431 1.2942963 1.0768396
##
## [[4]]
## [1] 1.0422013 0.9166641 0.8326720 1.1864809 0.9609456 1.3137716
##
## [[5]]
## [1] 0.8057316 0.9488173 1.0792078 0.9774531 0.8106612 0.8403027
##
## [[6]]
## [1] 1.0156983 1.0077599 0.9867766 1.1643493 0.9478923 1.1770221
##
## [[7]]
## [1] 1.2287013 1.0046353 0.9560784 1.0354414 0.9045423 0.9455714
##
## [[8]]
## [1] 0.7768910 1.0376265 0.8830854 0.8911714 1.0288567 1.1609360
Seems to have worked.
length(mout)
## [1] 8
sapply(mout, length)
## [1] 12500 12500 12500 12500 12500 12500 12500 12500
lapply(mout, head)
## [[1]]
## [1] 0.9051972 0.9589889 0.9799828 1.1347548 0.9090886 0.9821320
##
## [[2]]
## [1] 0.8317815 1.3432331 0.7821308 1.2010078 0.9792244 1.1148521
##
## [[3]]
## [1] 0.8627829 0.9790400 1.1787975 0.7852431 1.2942963 1.0768396
##
## [[4]]
## [1] 1.0422013 0.9166641 0.8326720 1.1864809 0.9609456 1.3137716
##
## [[5]]
## [1] 0.8057316 0.9488173 1.0792078 0.9774531 0.8106612 0.8403027
##
## [[6]]
## [1] 1.0156983 1.0077599 0.9867766 1.1643493 0.9478923 1.1770221
##
## [[7]]
## [1] 1.2287013 1.0046353 0.9560784 1.0354414 0.9045423 0.9455714
##
## [[8]]
## [1] 0.7768910 1.0376265 0.8830854 0.8911714 1.0288567 1.1609360
lapply(mout, range)
## [[1]]
## [1] -1.452060 1.742441
##
## [[2]]
## [1] -1.104573 1.878888
##
## [[3]]
## [1] -1.164338 1.800782
##
## [[4]]
## [1] -1.195419 1.707977
##
## [[5]]
## [1] -1.275307 1.785461
##
## [[6]]
## [1] -1.115946 1.773345
##
## [[7]]
## [1] -1.045440 1.774671
##
## [[8]]
## [1] -1.405485 1.662768
Plot it.
theta.hat <- unlist(mout)
hist(theta.hat, probability = TRUE, breaks = 30)
curve(dnorm(x, mean = theta, sd = theta / sqrt(3 * n)), add = TRUE)
time4 <- NULL
for (irep in 1:nrep)
time4 <- rbind(time4, system.time(theta.hat.mle <-
unlist(mclapply(rep(nsim / ncores, ncores), doit,
estimator = mle, mc.cores = ncores))))
time4
## user.self sys.self elapsed user.child sys.child
## [1,] 0.002 0.024 1.860 10.638 0.252
## [2,] 0.006 0.020 1.872 12.521 0.284
## [3,] 0.002 0.033 1.880 12.311 0.280
## [4,] 0.004 0.025 1.948 12.613 0.257
## [5,] 0.000 0.030 2.068 12.977 0.300
## [6,] 0.008 0.020 2.149 13.121 0.318
## [7,] 0.005 0.025 2.033 12.873 0.310
apply(time4, 2, mean)
## user.self sys.self elapsed user.child sys.child
## 0.003857143 0.025285714 1.972857143 12.436285714 0.285857143
apply(time4, 2, sd) / sqrt(nrep)
## user.self sys.self elapsed user.child sys.child
## 0.001033454 0.001822012 0.042495058 0.317724702 0.009552932
We got the desired speedup. The elapsed time averages
apply(time4, 2, mean)["elapsed"]
## elapsed
## 1.972857
with parallelization and
apply(time1, 2, mean)["elapsed"]
## elapsed
## 8.049286
without parallelization. But we did not get a 8-fold speedup with 8 hyperthreads. There is a cost to starting and stopping the child processes. And some time needs to be taken from this number crunching to run the rest of the computer. However, we did get slightly more than a 4-fold speedup. If we had more cores in our machine, we could do even better.
This method is more complicated but
it works on clusters like the ones at the Minnesota Supercomputing Institute or at LATIS (College of Liberal Arts Technologies and Innovation Services, and
according to the documentation, it does work on Windows.
First a toy problem that does nothing except show that we are actually using different processes.
library(parallel)
ncores <- detectCores()
cl <- makePSOCKcluster(ncores)
parLapply(cl, 1:ncores, function(x) Sys.getpid())
## [[1]]
## [1] 317206
##
## [[2]]
## [1] 317208
##
## [[3]]
## [1] 317211
##
## [[4]]
## [1] 317209
##
## [[5]]
## [1] 317205
##
## [[6]]
## [1] 317207
##
## [[7]]
## [1] 317212
##
## [[8]]
## [1] 317210
stopCluster(cl)
This is more complicated in that
first you you set up a cluster, here with makePSOCKcluster
but not everywhere — there are a variety of different commands to make clusters and the command would be different at MSI or LATIS — and
at the end you tear down the cluster with stopCluster
.
Of course, you do not need to tear down the cluster before you are done with it. You can execute multiple parLapply
commands on the same cluster.
There are also a lot of other commands other than parLapply
that can be used on the cluster. We will see some of them below.
cl <- makePSOCKcluster(ncores)
clusterSetRNGStream(cl, 42)
parLapply(cl, 1:ncores, function(x) rnorm(5))
## [[1]]
## [1] -0.93907708 -0.04167943 0.82941349 -0.43935820 -0.31403543
##
## [[2]]
## [1] 1.11932846 -0.07617141 -0.35021912 -0.33491161 -1.73311280
##
## [[3]]
## [1] -0.2084809 -1.0341493 -0.2629060 0.3880115 0.8331067
##
## [[4]]
## [1] 0.001100034 1.763058291 -0.166377859 -0.311947389 0.694879494
##
## [[5]]
## [1] 0.2262605 -0.4827515 1.7637105 -0.1887217 -0.7998982
##
## [[6]]
## [1] 0.8584220 -0.3851236 1.0817530 0.2851169 0.1799325
##
## [[7]]
## [1] -1.1378621 -1.5197576 -0.9198612 1.0303683 -0.9458347
##
## [[8]]
## [1] -0.04649149 3.38053730 -0.35705061 0.17722940 -0.39716405
parLapply(cl, 1:ncores, function(x) rnorm(5))
## [[1]]
## [1] -2.1290236 2.5069224 -1.1273128 0.1660827 0.5767232
##
## [[2]]
## [1] 0.03628534 0.29647473 1.07128138 0.72844380 0.12458507
##
## [[3]]
## [1] -0.1652167 -0.3262253 -0.2657667 0.1878883 1.4916193
##
## [[4]]
## [1] 0.3541931 -0.6820627 -1.0762411 -0.9595483 0.0982342
##
## [[5]]
## [1] 0.5441483 1.0852866 1.6011037 -0.5018903 -0.2709106
##
## [[6]]
## [1] -0.57445721 -0.86440961 -0.77401840 0.54423137 -0.01006838
##
## [[7]]
## [1] -1.3057289 0.5911102 0.8416164 1.7477622 -0.7824792
##
## [[8]]
## [1] 0.9071634 0.2518615 -0.4905999 0.4900700 0.7970189
We see that clusters do not have the same problem with continuing random number streams that the fork-exec mechanism has.
Using fork-exec there is a parent process and child processes (all running on the same computer) and the child processes end when their work is done (when mclapply
finishes).
Using clusters there is a controller process and worker processes (possibly running on many different computers) and the worker processes end when the cluster is torn down (with stopCluster
).
So the worker processes continue and remember where they are in the random number stream.
Another complication of using clusters is that the worker processes are completely independent of the controller process. Any information they have must be explicitly passed to them.
This is very unlike the fork-exec model in which all of the child processes are copies of the parent process inheriting all of its memory (and thus knowing about any and all R objects it created).
So in order for our example to work we must explicitly distribute stuff to the cluster.
clusterExport(cl, c("doit", "mle", "mlogl", "n", "nsim", "theta"))
Now all of the workers have those R objects, as copied from the controller process right now. If we change them in the controller (pedantically if we change the R objects those names refer to) the workers won’t know about it. They only would get access to those changes if code were executed on them to do so.
So now we are set up to try our example.
pout <- parLapply(cl, rep(nsim / ncores, ncores), doit, estimator = mle)
Seems to have worked.
length(pout)
## [1] 8
sapply(pout, length)
## [1] 12500 12500 12500 12500 12500 12500 12500 12500
lapply(pout, head)
## [[1]]
## [1] 1.0079313 0.7316543 0.4958322 0.7705943 0.7734226 0.6158992
##
## [[2]]
## [1] 0.9589889 0.9799828 1.1347548 0.9090886 0.9821320 1.0032531
##
## [[3]]
## [1] 1.3432331 0.7821308 1.2010078 0.9792244 1.1148521 0.9269000
##
## [[4]]
## [1] 0.9790400 1.1787975 0.7852431 1.2942963 1.0768396 0.7546295
##
## [[5]]
## [1] 0.9166641 0.8326720 1.1864809 0.9609456 1.3137716 0.9832663
##
## [[6]]
## [1] 0.9488173 1.0792078 0.9774531 0.8106612 0.8403027 1.1296857
##
## [[7]]
## [1] 1.0077599 0.9867766 1.1643493 0.9478923 1.1770221 1.2789464
##
## [[8]]
## [1] 1.0046353 0.9560784 1.0354414 0.9045423 0.9455714 1.0312553
lapply(pout, range)
## [[1]]
## [1] -1.339373 1.800773
##
## [[2]]
## [1] -1.452060 1.742441
##
## [[3]]
## [1] -1.104573 1.878888
##
## [[4]]
## [1] -1.164338 1.800782
##
## [[5]]
## [1] -1.195419 1.707977
##
## [[6]]
## [1] -1.275307 1.785461
##
## [[7]]
## [1] -1.115946 1.773345
##
## [[8]]
## [1] -1.045440 1.774671
Plot it.
theta.hat <- unlist(mout)
hist(theta.hat, probability = TRUE, breaks = 30)
curve(dnorm(x, mean = theta, sd = theta / sqrt(3 * n)), add = TRUE)
time5 <- NULL
for (irep in 1:nrep)
time5 <- rbind(time5, system.time(theta.hat.mle <-
unlist(parLapply(cl, rep(nsim / ncores, ncores),
doit, estimator = mle))))
time5
## user.self sys.self elapsed user.child sys.child
## [1,] 0.005 0.001 1.912 0 0
## [2,] 0.005 0.000 1.972 0 0
## [3,] 0.006 0.000 1.921 0 0
## [4,] 0.006 0.000 1.857 0 0
## [5,] 0.006 0.000 1.860 0 0
## [6,] 0.006 0.000 1.847 0 0
## [7,] 0.006 0.000 1.895 0 0
apply(time5, 2, mean)
## user.self sys.self elapsed user.child sys.child
## 0.0057142857 0.0001428571 1.8948571429 0.0000000000 0.0000000000
apply(time5, 2, sd) / sqrt(nrep)
## user.self sys.self elapsed user.child sys.child
## 0.0001844278 0.0001428571 0.0168090517 0.0000000000 0.0000000000
We got the desired speedup. The elapsed time averages
apply(time5, 2, mean)["elapsed"]
## elapsed
## 1.894857
with parallelization and
apply(time1, 2, mean)["elapsed"]
## elapsed
## 8.049286
without parallelization. But we did not get a 8-fold speedup with 8 cores (actually hyperthreads). There is a cost to sending information to and from the worker processes. And some time needs to be taken from this number crunching to run the rest of the computer. However, we did get slightly more than a 4-fold speedup. If we had more workers that could run simultaneously (like on a cluster at LATIS or at the supercomputer center), we could do even better.
Don’t forget to tear down the cluster when you are done.
stopCluster(cl)