Parallel Computing

  • Data becomes cheaper, because machines are becoming cheaper and faster.
  • Computers become cheaper and faster too. However, the performance of single processor core has not changed much recently — instead we are getting multi-core processor, and clusters of multi-cores.
  • To handle the large amount of data collected from modern machines with large amount of computing units, we need to use parallel or distributed computing, in other words, to get many computers to perform the same task simultaneously.
  • Many problems are “embarrassingly parallel”. In other words, they can be split into many smaller tasks and passed on to many many computers for the computation to be carried out simultaneously .
  • If we have a single computer at our disposal and have to run n models, each taking s seconds, the total running time will be $n*s$. If however, we have $k < n$ computers we can run our models on, the total running time will $n*s/k$. In the old days this was how parallel code was run; and is still run on larger servers. However, modern computers have “multicore” processors and can be equivalent to running multiple computers at a time. The equation is not quite as clean (there are other things running on each process; overhead in transferring between processors exists; etc) but in general we see the same gain.

Terminology

  • A core is a general term for either a single processor on your own computer (technically you only have one processor, but a modern processor like the i7 can have multiple cores - hence the term) or a single machine in a cluster network.
  • A cluster is a collection of objecting capable of hosting cores, either a network or just the collection of cores on your personal computer.
  • A process is a single running version of R (or more generally any program). Each core runs a single process.

When to parallelize

  • It’s not as simple as it may seem.
  • While in theory each added processor would linearly increase the throughput of a computation, there is overhead that reduces that efficiency.
  • For example, the code and, importantly, the data need to be copied to each additional CPU, and this takes time and bandwidth.
  • Plus, new processes and/or threads need to be created by the operating system, which also takes time. This overhead reduces the efficiency enough that realistic performance gains are much less than theoretical, and usually do not scale linearly as a function of processing power.
  • For example, if the time that a computation takes is short, then the overhead of setting up these additional resources may actually overwhelm any advantages of the additional processing power, and the computation could potentially take longer!

Parallel computing in R

  • R comes up with a group of packages for parallel computing. You may have heard about multicore, snow, foreach, etc.
  • When you have a list of repetitive tasks, you may be able to speed it up by adding more computing power. If each task is completely independent of the others, then it is a prime candidate for executing those tasks in parallel, each on its own core.
  • For example, let’s build a simple loop that uses sample with replacement to do a bootstrap analysis. In this case, we select Sepal.Length and Species from the iris dataset, subset it to 100 observations, and then iterate across 10,000 trials, each time resampling the observations with replacement. We then run a logistic regression fitting species as a function of length, and record the coefficients for each trial to be returned.
In [1]:
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
res <- data.frame()
system.time({
  trial <- 1
  while(trial <= trials) {
    ind <- sample(100, 100, replace=TRUE)
    result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
    r <- coefficients(result1)
    res <- rbind(res, r)
    trial <- trial + 1
  }
})
   user  system elapsed 
 21.833   0.595  22.484 

Parallelize using mclapply

  • The parallel library can be used to send tasks (encoded as function calls) to each of the processing cores on your machine in parallel.
  • This is done by using the parallel::mclapply function, which is analogous to lapply, but distributes the tasks to multiple processors.
  • mclapply gathers up the responses from each of these function calls, and returns a list of responses that is the same length as the list or vector of input data (one return per input item).
In [2]:
library(parallel)
library(MASS)

starts <- rep(100, 40)
fx <- function(nstart) kmeans(Boston, 4, nstart=nstart)
numCores <- detectCores()
numCores
8
In [3]:
system.time(
  results <- lapply(starts, fx)
)
   user  system elapsed 
  1.285   0.031   1.319 
In [4]:
system.time(
  results <- mclapply(starts, fx, mc.cores = numCores)
)
   user  system elapsed 
  1.826   0.568   0.509 

Now let’s demonstrate with our bootstrap example:

In [5]:
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
  ind <- sample(100, 100, replace=TRUE)
  result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
  r <- coefficients(result1)
  res <- rbind(data.frame(), r)
}
system.time({
  results <- mclapply(trials, boot_fx, mc.cores = numCores)
})
   user  system elapsed 
 32.718   1.090   5.118 

Parallelize using: foreach and doParallel

Now compare the following two piece of code.

In [12]:
for (i in 1:3) {
  print(sqrt(i))
}
[1] 1
[1] 1.414214
[1] 1.732051
In [7]:
library(foreach)
foreach (i=1:3) %do% {
  sqrt(i)
}
  1. 1
  2. 1.4142135623731
  3. 1.73205080756888

To simplify output, foreach has the .combine parameter that can simplify return values.

In [13]:
# Return a vector
foreach (i=1:3, .combine=c) %dopar% {
  sqrt(i)
}

# Return a data frame
foreach (i=1:3, .combine=rbind) %dopar% {
  sqrt(i)
}
  1. 1
  2. 1.4142135623731
  3. 1.73205080756888
result.11.000000
result.21.414214
result.31.732051

Now let's go back to the bootstraping example.

In [14]:
doMC::registerDoMC(cores = 8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
system.time({
  r <- foreach(1:trials, .combine=rbind) %dopar% {
    ind <- sample(100, 100, replace=TRUE)
    result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
    coefficients(result1)
  }
})
   user  system elapsed 
 34.431   1.701   7.107 

Other packages for parallel computing in R

Many R packages come up with parallel features (or arguments). You may refer to this link for more details. Examples are:

  • Data.table is a venerable and powerful package written primarily by Matt Dowle. It is a high-performance implementation of R’s data frame construct, with an enhanced syntax. There have been innumerable benchmarks showcasing the power of the data.table package. It provides a highly optimized tabular data structure for most common analytical operations.
  • The caret package (Classification And REgression Training) is a set of functions that streamline the process for creating predictive models. The package contains tools for data splitting, preprocessing, feature selection, model tuning using resampling, variable importance estimation, and other functionality.
  • Multidplyr is a backend for dplyr that partitions a data frame across multiple cores. You tell multidplyr how to split the data up with partition(), and then the data stays on each node until you explicitly retrieve it with collect(). This minimizes time spent moving data around, and maximizes parallel performance.

Parallel computing in Python

  • We will look at Python’s multiprocessing module and how we can use it to submit multiple processes that can run independently from each other in order to make best use of our CPU cores.
  • The multiprocessing module in Python’s Standard Library has a lot of powerful features. If you want to read about all the nitty-gritty tips, tricks, and details, I would recommend to use the official documentation as an entry point.
  • In the following sections, I want to provide a brief overview of different approaches to show how the multiprocessing module can be used for parallel programming.

The process class

The most basic approach is probably to use the Process class from the multiprocessing module. Here, we will use a simple queue function to generate four random strings in s parallel.

In [1]:
import multiprocessing as mp
import random
import string

random.seed(123)
# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, pos, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put((pos, rand_str))

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)
[(0, 'KMko6'), (1, 's3XGx'), (2, 'BCq9y'), (3, 'Bgs0s')]

The Pool class

In [10]:
def cube(x):
    return x**3

pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x in range(1,7)]
print(results)
[1, 8, 27, 64, 125, 216]
In [11]:
pool = mp.Pool(processes=4)
results = pool.map(cube, range(1,7))
print(results)
[1, 8, 27, 64, 125, 216]

The Pool.map and Pool.apply will lock the main program until all processes are finished, which is quite useful if we want to obtain results in a particular order for certain applications. In contrast, the async variants will submit all processes at once and retrieve the results as soon as they are finished. One more difference is that we need to use the get method after the apply_async() call in order to obtain the return values of the finished processes.

In [12]:
pool = mp.Pool(processes=4)
results = [pool.apply_async(cube, args=(x,)) for x in range(1,10)]
output = [p.get() for p in results]
print(output)
[1, 8, 27, 64, 125, 216, 343, 512, 729]

Application

In [13]:
# utils.py
 
import time
import logging
import requests
 
 
class WebsiteDownException(Exception):
    pass
 
 
def ping_website(address, timeout=5):
    """
    Check if a website is down. A website is considered down 
    if either the status_code >= 400 or if the timeout expires
     
    Throw a WebsiteDownException if any of the website down conditions are met
    """
    try:
        response = requests.head(address, timeout=timeout)
        if response.status_code >= 400:
            logging.warning("Website %s returned status_code=%s" % (address, response.status_code))
            raise WebsiteDownException()
    except requests.exceptions.RequestException:
        logging.warning("Timeout expired for website %s" % address)
        raise WebsiteDownException()
         
 
def notify_owner(address):
    """ 
    Send the owner of the address a notification that their website is down 
     
    For now, we're just going to sleep for 0.5 seconds but this is where 
    you would send an email, push notification or text-message
    """
    logging.info("Notifying the owner of %s website" % address)
    time.sleep(0.5)
     
 
def check_website(address):
    """
    Utility function: check if a website is down, if so, notify the user
    """
    try:
        ping_website(address)
    except WebsiteDownException:
        notify_owner(address)
In [18]:
# websites.py
 
WEBSITE_LIST = [
    'http://envato.com',
    'http://baidu.com',
    'http://amazon.com',
    'http://stackoverflow.com',
    'http://github.com',
    'http://heroku.com',
    'http://trello.com',
    'http://yiiframework.com',
    'http://shopify.com',
    'http://airbnb.com',
    'http://instagram.com',
    'http://snapchat.com',
    'http://youtube.com',
    'http://live.com',
    'http://linkedin.com',
    'http://yandex.ru',
    'http://netflix.com',
    'http://wordpress.com',
    'http://bing.com',
]

Serial approach

In [20]:
# serial_squirrel.py
 
import time
 
 
start_time = time.time()
 
for address in WEBSITE_LIST:
    check_website(address)
         
end_time = time.time()        
 
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time))
WARNING:root:Timeout expired for website http://amazon.com
WARNING:root:Timeout expired for website http://instagram.com
WARNING:root:Timeout expired for website http://youtube.com
WARNING:root:Website http://netflix.com returned status_code=405
WARNING:root:Timeout expired for website http://wordpress.com
Time for SerialSquirrel: 44.172788858413696secs

Parallel approach

In [19]:
# multiprocessing_squirrel.py
 
import time
import socket
import multiprocessing
 
NUM_WORKERS = 4
 
start_time = time.time()
 
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map_async(check_website, WEBSITE_LIST)
    results.wait()
 
end_time = time.time()        
 
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time))
WARNING:root:Timeout expired for website http://instagram.com
WARNING:root:Timeout expired for website http://youtube.com
WARNING:root:Timeout expired for website http://netflix.com
WARNING:root:Timeout expired for website http://wordpress.com
WARNING:root:Timeout expired for website http://amazon.com
Time for MultiProcessingSquirrel: 17.372195720672607secs

Other modules for parallel computing in Python

Many come up with parallel features (or arguments). Examples are:

  • Joblib is a set of tools for lightweight pipelining in Python.
  • Scikit-learn is a free software machine learning library for the Python programming language. It features various classification, regression, and clustering algorithms including support for vector machines, random forests, gradient boosting, k-means and DBSCAN. It is designed to interoperate with the Python numerical and scientific libraries NumPy and SciPy.
  • If you want to know more details about parallel computing in Python, you can refer to [this link] (https://wiki.python.org/moin/ParallelProcessing) or this book.