Yanfei Kang
yanfeikang@buaa.edu.cn
School of Economics and Management
Beihang University
http://yanfei.site
i7
can have multiple cores - hence the term) or a single machine in a cluster network.Your notice computations are too slow, and wonder “why is that?” Should you store your data differently? Should you use different software? Should you buy more RAM? Should you “go cloud”?
No one-size-fits-all solution to speed problems.
Solving a RAM bottleneck may consume more CPU. Solving a CPU bottleneck may consume more RAM. Parallelisation means using multiple CPUs simultaneously. It will thus aid with CPU bottlenecks, but may consume more RAM. Parallelising is thus ill advised when dealing with a RAM bottleneck.
When deciding if, and how, to parallelise, it is crucial that you diagnose your bottleneck. The good news is- that diagnostics is not too hard.
You never drive without looking at your dashboard; you should never program without looking at your system monitors. Windows users have their Task Manager; Linux users have top, or preferably, htop; Mac users have the Activity Monitor. The system monitor will inform you how your RAM and CPUs are being used.
If you forcefully terminate your computation, and R takes a long time to respond, you are probably dealing with a RAM bottleneck.
Profile your code to detect how much RAM and CPU are consumed by each line of code.
multicore
, snow
, foreach
, etc. parallel
¶The parallel
package was introduced in 2011 to unify two popular parallisation packages: snow
and multicore
. The multicore
package was designed to parallelise using the fork mechanism, on Linux machines. The snow
package was designed to parallelise other mechanisms. R processes started with snow
are not forked, so they will not see the parent’s data. Data will have to be copied to child processes. The good news: snow can start R processes on Windows machines, or remotely machines in the cluster.
parallel
library can be used to send tasks (encoded as function calls) to each of the processing cores on your machine in parallel. mclapply()
function essentially parallelizes calls to lapply()
. mclapply
gathers up the responses from each of these function calls, and returns a list of responses that is the same length as the list or vector of input data (one return per input item).mclapply
¶mclapply()
function (and related mc*
functions) works via the fork mechanism on Unix-style operating systems. mclapply()
, you fork a series of sub-processes that operate independently from the main process (although they share a few low-level features). The first thing you might want to check with the parallel package is if your computer in fact has multiple cores that you can take advantage of.
library(parallel)
nCores <- detectCores()
nCores
Let’s build a simple loop that uses sample with replacement to do a bootstrap analysis.
Sepal.Length
and Species
from the iris dataset, subset it to 100 observations, and then iterate across 10,000 trials, each time resampling the observations with replacement. # Sequential computing
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
r <- coefficients(result1)
res <- rbind(data.frame(), r)
}
system.time({
results <- lapply(trials, boot_fx)
})
user system elapsed 21.142 0.007 21.149
# Parallel computing using mclapply
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
r <- coefficients(result1)
res <- rbind(data.frame(), r)
}
system.time({
results <- mclapply(trials, boot_fx, mc.cores = nCores)
})
user system elapsed 37.632 5.924 1.760
When executing parallel jobs via mclapply()
it’s important to pre-calculate how much memory all of
the processes will require and make sure this is less than the total amount of memory on your computer.
A major advantage of the multicore processing in parallel due to forked parallel processing is that global variables in the main R session are inherited by the child processes. This means the developer does not have to spend efforts on identifying and exporting those to the parallel workers.
Only applicable in Unix-style OS.
parLapply
¶parLapply
¶Building a socket cluster is simple to do in R with the makeCluster()
function. Here I’m initializing a cluster with 4 components.
# Parallel computing using parLapply
cl <- makeCluster(nCores, type = 'SOCK')
system.time(results <- parLapply(cl, trials, boot_fx))
stopCluster(cl)
Error in checkForRemoteErrors(val): one node produced an error: object 'x' not found Traceback: 1. system.time(results <- parLapply(cl, trials, boot_fx)) 2. parLapply(cl, trials, boot_fx) 3. do.call(c, clusterApply(cl = cl, x = splitList(X, nchunks), fun = lapply, . FUN = fun, ...), quote = TRUE) 4. clusterApply(cl = cl, x = splitList(X, nchunks), fun = lapply, . FUN = fun, ...) 5. staticClusterApply(cl, fun, length(x), argfun) 6. checkForRemoteErrors(val) 7. stop("one node produced an error: ", firstmsg, domain = NA)
Timing stopped at: 0.003 0 0.006
parLapply
¶# Parallel computing using parLapply
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
cl <- makeCluster(nCores, type = 'SOCK')
clusterExport(cl, "x")
system.time(results <- parLapply(cl, trials, boot_fx))
stopCluster(cl)
user system elapsed 0.030 0.004 1.423
foreach
¶multicore
+ doMC
+ foreach
.snow
+ doParallel
+ foreach
.# Almost any OS
library(doParallel)
cl <- makeCluster(nCores, type = 'SOCK')
registerDoParallel(cl)
result <- foreach(i = 1:10000) %dopar% sqrt(i)
stopCluster(cl)
# Unix-style OS
library(doMC)
registerDoMC(nCores)
result <- foreach(i = 1:10000, .combine = c) %dopar% sqrt(i)
# Parallel computing using foreach
doMC::registerDoMC(cores = nCores)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
system.time({
r <- foreach(1:trials, .combine = rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})
user system elapsed 28.814 5.485 2.569
Many R packages come up with parallel features (or arguments). You may refer to this link for more details. Examples are:
Data.table
is a venerable and powerful package written primarily by Matt Dowle. It is a high-performance implementation of R’s data frame construct, with an enhanced syntax. There have been innumerable benchmarks showcasing the power of the data.table package. It provides a highly optimized tabular data structure for most common analytical operations.caret
package (Classification And REgression Training) is a set of functions that streamline the process for creating predictive models. The package contains tools for data splitting, preprocessing, feature selection, model tuning using resampling, variable importance estimation, and other functionality.Multidplyr
is a backend for dplyr that partitions a data frame across multiple cores. You tell multidplyr how to split the data up with partition(), and then the data stays on each node until you explicitly retrieve it with collect(). This minimizes time spent moving data around, and maximizes parallel performance.
multiprocessing
module and how we can use it to submit multiple processes that can run independently from each other in order to make best use of our CPU cores.multiprocessing
module in Python’s Standard Library has a lot of powerful features. If you want to read about all the nitty-gritty tips, tricks, and details, I would recommend to use the official documentation as an entry point.import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())
Number of processors: 32
process
class¶The most basic approach is probably to use the Process
class from the multiprocessing
module.
Here, we will use a simple queue function to generate four random strings in s parallel.
import multiprocessing as mp
import random
import string
random.seed(123)
# Define an output queue
output = mp.Queue()
# define a example function
def rand_string(length, pos, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put((pos, rand_str))
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(4)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
[(0, 'tL1Lf'), (1, '3uOHu'), (2, 'OuaPe'), (3, 'kXUXK')]
Pool
class¶def cube(x):
return x**3
pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x in range(1,7)]
print(results)
[1, 8, 27, 64, 125, 216]
pool = mp.Pool(processes=4)
results = pool.map(cube, range(1,7))
print(results)
[1, 8, 27, 64, 125, 216]
The Pool.map
and Pool.apply
will lock the main program until all processes are finished, which is quite useful if we want to obtain results in a particular order for certain applications.
In contrast, the async variants will submit all processes at once and retrieve the results as soon as they are finished. One more difference is that we need to use the get method after the apply_async()
call in order to obtain the return values of the finished processes.
pool = mp.Pool(processes=4)
results = [pool.apply_async(cube, args=(x,)) for x in range(1,10)]
output = [p.get() for p in results]
print(output)
[1, 8, 27, 64, 125, 216, 343, 512, 729]
# utils.py
import time
import logging
import requests
class WebsiteDownException(Exception):
pass
def ping_website(address, timeout=5):
"""
Check if a website is down. A website is considered down
if either the status_code >= 400 or if the timeout expires
Throw a WebsiteDownException if any of the website down conditions are met
"""
try:
response = requests.head(address, timeout=timeout)
if response.status_code >= 400:
logging.warning("Website %s returned status_code=%s" % (address, response.status_code))
raise WebsiteDownException()
except requests.exceptions.RequestException:
logging.warning("Timeout expired for website %s" % address)
raise WebsiteDownException()
def notify_owner(address):
"""
Send the owner of the address a notification that their website is down
For now, we're just going to sleep for 0.5 seconds but this is where
you would send an email, push notification or text-message
"""
logging.info("Notifying the owner of %s website" % address)
time.sleep(0.5)
def check_website(address):
"""
Utility function: check if a website is down, if so, notify the user
"""
try:
ping_website(address)
except WebsiteDownException:
notify_owner(address)
# websites.py
WEBSITE_LIST = [
'http://envato.com',
'http://baidu.com',
'http://amazon.com',
'http://stackoverflow.com',
'http://github.com',
'http://heroku.com',
'http://trello.com',
'http://yiiframework.com',
'http://shopify.com',
'http://airbnb.com',
'http://instagram.com',
'http://snapchat.com',
'http://youtube.com',
'http://live.com',
'http://linkedin.com',
'http://yandex.ru',
'http://netflix.com',
'http://wordpress.com',
'http://bing.com',
]
# serial_squirrel.py
import time
start_time = time.time()
for address in WEBSITE_LIST:
check_website(address)
end_time = time.time()
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time))
WARNING:root:Timeout expired for website http://instagram.com WARNING:root:Timeout expired for website http://youtube.com WARNING:root:Website http://netflix.com returned status_code=403 WARNING:root:Timeout expired for website http://wordpress.com
Time for SerialSquirrel: 43.01436996459961secs
# multiprocessing_squirrel.py
import time
import socket
import multiprocessing
NUM_WORKERS = 4
start_time = time.time()
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
results = pool.map_async(check_website, WEBSITE_LIST)
results.wait()
end_time = time.time()
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time))
WARNING:root:Website http://netflix.com returned status_code=403 WARNING:root:Timeout expired for website http://instagram.com WARNING:root:Timeout expired for website http://youtube.com WARNING:root:Timeout expired for website http://wordpress.com
Time for MultiProcessingSquirrel: 12.823455572128296secs
Many come up with parallel features (or arguments). Examples are:
Joblib
is a set of tools for lightweight pipelining in Python.Scikit-learn
is a free software machine learning library for the Python programming language. It features various classification, regression, and clustering algorithms including support for vector machines, random forests, gradient boosting, k-means and DBSCAN. It is designed to interoperate with the Python numerical and scientific libraries NumPy
and SciPy
.