What is parallel computing¶
- Data becomes cheaper, because machines are becoming cheaper and faster.
- Computers become cheaper and faster too. However, the performance of single processor core has not changed much recently — instead we are getting multi-core processor, and clusters of multi-cores.
- To handle the large amount of data collected from modern machines with large amount of computing units, we need to use parallel or distributed computing, in other words, to get many computers to perform the same task simultaneously.
Embarrassingly parallel¶
- Many problems are “embarrassingly parallel”. In other words, they can be split into many smaller tasks and passed on to many many computers for the computation to be carried out simultaneously .
- If we have a single computer at our disposal and have to run $n$ models, each taking $s$ seconds, the total running time will be $n*s$. If however, we have $k < n$ computers we can run our models on, the total running time will $n*s/k$. In the old days this was how parallel code was run; and is still run on larger servers.
- However, modern computers have “multicore” processors and can be equivalent to running multiple computers at a time. The equation is not quite as clean (there are other things running on each process; overhead in transferring between processors exists; etc) but in general we see the same gain.
- A core: a general term for either a single processor on your own computer (technically you only have one processor, but a modern processor like the
can have multiple cores - hence the term) or a single machine in a cluster network. - A cluster: a collection of objecting capable of hosting cores, either a network or just the collection of cores on your personal computer.
- A Node/Machine: a single physical machine in the cluster.
- A process: a single running version of R (or more generally any program). Each core runs a single process.
When to parallelize¶
- It’s not as simple as it may seem.
- While in theory each added processor would linearly increase the throughput of a computation, there is overhead that reduces that efficiency.
- For example, the code and, importantly, the data need to be copied to each additional CPU, and this takes time and bandwidth.
- Plus, new processes and/or threads need to be created by the operating system, which also takes time. This overhead reduces the efficiency enough that realistic performance gains are much less than theoretical, and usually do not scale linearly as a function of processing power.
- For example, if the time that a computation takes is short, then the overhead of setting up these additional resources may actually overwhelm any advantages of the additional processing power, and the computation could potentially take longer!
Your notice computations are too slow, and wonder “why is that?” Should you store your data differently? Should you use different software? Should you buy more RAM? Should you “go cloud”?
No one-size-fits-all solution to speed problems.
Solving a RAM bottleneck may consume more CPU. Solving a CPU bottleneck may consume more RAM. Parallelisation means using multiple CPUs simultaneously. It will thus aid with CPU bottlenecks, but may consume more RAM. Parallelising is thus ill advised when dealing with a RAM bottleneck.
How to diagnose?¶
When deciding if, and how, to parallelise, it is crucial that you diagnose your bottleneck. The good news is- that diagnostics is not too hard.
You never drive without looking at your dashboard; you should never program without looking at your system monitors. Windows users have their Task Manager; Linux users have top, or preferably, htop; Mac users have the Activity Monitor. The system monitor will inform you how your RAM and CPUs are being used.
If you forcefully terminate your computation, and R takes a long time to respond, you are probably dealing with a RAM bottleneck.
Profile your code to detect how much RAM and CPU are consumed by each line of code.
Parallel computing in R¶
- R comes up with a group of packages for parallel computing. You may have heard about
, etc. - When you have a list of repetitive tasks, you may be able to speed it up by adding more computing power. If each task is completely independent of the others, then it is a prime candidate for executing those tasks in parallel, each on its own core.
Parallelize using parallel
The parallel
package was introduced in 2011 to unify two popular parallisation packages: snow
and multicore
. The multicore
package was designed to parallelise using the fork mechanism, on Linux machines. The snow
package was designed to parallelise other mechanisms. R processes started with snow
are not forked, so they will not see the parent’s data. Data will have to be copied to child processes. The good news: snow can start R processes on Windows machines, or remotely machines in the cluster.
- The
library can be used to send tasks (encoded as function calls) to each of the processing cores on your machine in parallel. - The most popular
function essentially parallelizes calls tolapply()
. mclapply
gathers up the responses from each of these function calls, and returns a list of responses that is the same length as the list or vector of input data (one return per input item).
Remark on mclapply
- The
function (and relatedmc*
functions) works via the fork mechanism on Unix-style operating systems. - Briefly, your R session is the main process and when you call a function like
, you fork a series of sub-processes that operate independently from the main process (although they share a few low-level features). - These sub-processes then execute your function on their subsets of the data, presumably on separate cores of your CPU. Once the computation is complete, each sub-process returns its results and then the sub-process is killed. The parallel package manages the logistics of forking the sub-processes and handling them once they’ve finished.
How many cores can I use?¶
The first thing you might want to check with the parallel package is if your computer in fact has multiple cores that you can take advantage of.
nCores <- detectCores()
Let’s build a simple loop that uses sample with replacement to do a bootstrap analysis.
- We select
from the iris dataset, subset it to 100 observations, and then iterate across 10,000 trials, each time resampling the observations with replacement. - Run a logistic regression fitting species as a function of length, and record the coefficients for each trial to be returned.
# Sequential computing
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
r <- coefficients(result1)
res <- rbind(data.frame(), r)
results <- lapply(trials, boot_fx)
user system elapsed 16.764 0.009 16.896
# Parallel computing using mclapply
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
r <- coefficients(result1)
res <- rbind(data.frame(), r)
results <- mclapply(trials, boot_fx, mc.cores = detectCores())
user system elapsed 3.417 0.170 1.827
When executing parallel jobs via
it’s important to pre-calculate how much memory all of the processes will require and make sure this is less than the total amount of memory on your computer.A major advantage of the multicore processing in parallel due to forked parallel processing is that global variables in the main R session are inherited by the child processes. This means the developer does not have to spend efforts on identifying and exporting those to the parallel workers.
Only applicable in Unix-style OS.
Parallelize with parLapply
- Using the forking mechanism on your computer is one way to execute parallel computation but it’s not the only way that the parallel package offers.
- Another way to build a “cluster” using the multiple cores on your computer is via sockets.
- A socket is simply a mechanism with which multiple processes or applications running on your computer (or different computers, for that matter) can communicate with each other.
- With parallel computation, data and results need to be passed back and forth between the parent and child processes and sockets can be used for that purpose.
Building a socket cluster is simple to do in R with the makeCluster()
# Parallel computing using parLapply
cl <- makeCluster(nCores, type = 'SOCK')
system.time(results <- parLapply(cl, trials, boot_fx))
Attaching package: ‘snow’ The following objects are masked from ‘package:parallel’: closeNode, clusterApply, clusterApplyLB, clusterCall, clusterEvalQ, clusterExport, clusterMap, clusterSplit, makeCluster, parApply, parCapply, parLapply, parRapply, parSapply, recvData, recvOneData, sendData, splitIndices, stopCluster
Error in checkForRemoteErrors(val): 16 nodes produced errors; first error: object 'x' not found Traceback: 1. parLapply(cl, trials, boot_fx) 2. docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, . fun, ...)) 3. do.call("fun", lapply(args, enquote)) 4. lapply(args, enquote) 5. clusterApply(cl, splitList(x, length(cl)), lapply, fun, ...) 6. staticClusterApply(cl, fun, length(x), argfun) 7. checkForRemoteErrors(val) 8. stop(count, " nodes produced errors; first error: ", firstmsg) 9. .handleSimpleError(function (cnd) . { . watcher$capture_plot_and_output() . cnd <- sanitize_call(cnd) . watcher$push(cnd) . switch(on_error, continue = invokeRestart("eval_continue"), . stop = invokeRestart("eval_stop"), error = NULL) . }, "16 nodes produced errors; first error: object 'x' not found", . base::quote(checkForRemoteErrors(val)))
Timing stopped at: 0.01 0 0.014
- The advantages of this model is that it is supported on all operating systems.
- The disadvantages are increased communication overhead, and that global variables have to be identified and explicitly exported to each worker in the cluster before processing. As discussed below, another advantage with cluster processing is that it supports also workers on external machines, possibly running in remote locations.
# Parallel computing using parLapply
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
cl <- makeCluster(nCores, type = 'SOCK')
clusterExport(cl, "x")
system.time(results <- parLapply(cl, trials, boot_fx))
user system elapsed 0.028 0.004 2.149
Parallelize using foreach
- On Unix-style OS,
. - On Windows (and most OS),
. - Very easy to go back to sequential computing.
# Almost any OS
cl <- makeCluster(nCores, type = 'SOCK')
result <- foreach(i = 1:10000, .combine = c) %dopar% sqrt(i)
Loading required package: foreach Loading required package: iterators
# Unix-style OS
result <- foreach(i = 1:10000, .combine = c) %dopar% sqrt(i)
Go back to the bootstraping example¶
# Parallel computing using foreach
doMC::registerDoMC(cores = nCores)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
r <- foreach(1:trials, .combine = rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
user system elapsed 24.012 1.155 2.625
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
- (Intercept)
- -14.8198034036431
- x[ind, 1]
- 2.35089627230312
Another example in time series forecasting¶
- Forecast 101 time series in
data in R. - Use
in theforecast
package. - Do it in parallel.
# For one time series
series <- M1[[999]]
model <- auto.arima(series$x)
forecast_result <- forecast(model, h = length(series$xx))
plot(forecast_result, main = "ARIMA Forecast for M1 Time Series")
lines(series$xx, col = "red", lwd = 2)
legend = c("Forecast", "Actual"),
col = c("blue", "red"), lwd = 2)
Series: M615 Type of series: DEMOGR Period of series: MONTHLY Series description: OTHER PRIVATE ROAD ACCIDENTS ENGLAND HISTORICAL data Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec 1971 26.0 1972 24.5 27.9 29.1 34.7 33.1 36.0 37.5 34.8 35.5 33.4 32.9 29.0 1973 24.7 31.3 32.4 33.9 35.0 36.4 36.5 34.4 33.9 33.9 36.4 27.0 1974 26.3 29.8 32.6 35.1 34.4 35.7 33.6 31.9 35.1 33.4 37.6 27.5 1975 27.2 30.2 28.6 34.1 30.9 34.7 33.7 33.6 31.0 28.9 29.7 23.9 1976 24.7 27.5 26.7 28.7 30.3 31.3 32.1 31.2 31.4 30.8 30.6 27.6 1977 23.4 25.0 26.0 31.0 29.3 31.7 32.0 30.0 31.8 33.6 31.6 26.9 1978 26.6 27.6 27.1 29.8 29.1 FUTURE data Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec 1978 32.6 31.6 31.1 33.2 33.6 34.0 28.4 1979 25.5 26.6 26.2 29.3 28.8 31.2 31.9 28.5 32.2 31.7 31.8
# For all time series
# Define the function to fit ARIMA and forecast
fit_and_forecast <- function(series, h = 18) {
model <- auto.arima(series$x)
forecast_result <- forecast(model, h = h)
# Set up parallel computing
forecast_results <- mclapply(M1, fit_and_forecast,
mc.cores = detectCores())
# Calculate MAE for each time series
calculate_mae <- function(forecast, actual) {
return(mean(abs(forecast - actual)))
mae_results <- mapply(calculate_mae, forecast_results,
lapply(M1, function(series) series$xx))
- Think about a slow piece of code you have ever written for either your past assignments or projects.
- Parallelize it in R/Python.