Big Data Essentials

L6: Parallel Computing





Yanfei Kang
yanfeikang@buaa.edu.cn
School of Economics and Management
Beihang University
http://yanfei.site

What is parallel computing

  • Data becomes cheaper, because machines are becoming cheaper and faster.
  • Computers become cheaper and faster too. However, the performance of single processor core has not changed much recently — instead we are getting multi-core processor, and clusters of multi-cores.
  • To handle the large amount of data collected from modern machines with large amount of computing units, we need to use parallel or distributed computing, in other words, to get many computers to perform the same task simultaneously.

Embarrassingly parallel

  • Many problems are “embarrassingly parallel”. In other words, they can be split into many smaller tasks and passed on to many many computers for the computation to be carried out simultaneously .
  • If we have a single computer at our disposal and have to run $n$ models, each taking $s$ seconds, the total running time will be $n*s$. If however, we have $k < n$ computers we can run our models on, the total running time will $n*s/k$. In the old days this was how parallel code was run; and is still run on larger servers.
  • However, modern computers have “multicore” processors and can be equivalent to running multiple computers at a time. The equation is not quite as clean (there are other things running on each process; overhead in transferring between processors exists; etc) but in general we see the same gain.

Terminology

  • A core: a general term for either a single processor on your own computer (technically you only have one processor, but a modern processor like the i7 can have multiple cores - hence the term) or a single machine in a cluster network.
  • A cluster: a collection of objecting capable of hosting cores, either a network or just the collection of cores on your personal computer.
  • A Node/Machine: a single physical machine in the cluster.
  • A process: a single running version of R (or more generally any program). Each core runs a single process.

When to parallelize

  • It’s not as simple as it may seem.
  • While in theory each added processor would linearly increase the throughput of a computation, there is overhead that reduces that efficiency.
  • For example, the code and, importantly, the data need to be copied to each additional CPU, and this takes time and bandwidth.
  • Plus, new processes and/or threads need to be created by the operating system, which also takes time. This overhead reduces the efficiency enough that realistic performance gains are much less than theoretical, and usually do not scale linearly as a function of processing power.
  • For example, if the time that a computation takes is short, then the overhead of setting up these additional resources may actually overwhelm any advantages of the additional processing power, and the computation could potentially take longer!

When to parallelize

  • Your notice computations are too slow, and wonder “why is that?” Should you store your data differently? Should you use different software? Should you buy more RAM? Should you “go cloud”?

  • No one-size-fits-all solution to speed problems.

  • Solving a RAM bottleneck may consume more CPU. Solving a CPU bottleneck may consume more RAM. Parallelisation means using multiple CPUs simultaneously. It will thus aid with CPU bottlenecks, but may consume more RAM. Parallelising is thus ill advised when dealing with a RAM bottleneck.

How to diagnose?

When deciding if, and how, to parallelise, it is crucial that you diagnose your bottleneck. The good news is- that diagnostics is not too hard.

  • You never drive without looking at your dashboard; you should never program without looking at your system monitors. Windows users have their Task Manager; Linux users have top, or preferably, htop; Mac users have the Activity Monitor. The system monitor will inform you how your RAM and CPUs are being used.

  • If you forcefully terminate your computation, and R takes a long time to respond, you are probably dealing with a RAM bottleneck.

  • Profile your code to detect how much RAM and CPU are consumed by each line of code.



Parallel computing in R

Parallel computing in R

  • R comes up with a group of packages for parallel computing. You may have heard about multicore, snow, foreach, etc.
  • When you have a list of repetitive tasks, you may be able to speed it up by adding more computing power. If each task is completely independent of the others, then it is a prime candidate for executing those tasks in parallel, each on its own core.

Parallelize using parallel

The parallel package was introduced in 2011 to unify two popular parallisation packages: snow and multicore. The multicore package was designed to parallelise using the fork mechanism, on Linux machines. The snow package was designed to parallelise other mechanisms. R processes started with snow are not forked, so they will not see the parent’s data. Data will have to be copied to child processes. The good news: snow can start R processes on Windows machines, or remotely machines in the cluster.

  • The parallel library can be used to send tasks (encoded as function calls) to each of the processing cores on your machine in parallel.
  • The most popular mclapply() function essentially parallelizes calls to lapply().
  • mclapply gathers up the responses from each of these function calls, and returns a list of responses that is the same length as the list or vector of input data (one return per input item).

Remark on mclapply

  • The mclapply() function (and related mc* functions) works via the fork mechanism on Unix-style operating systems.
  • Briefly, your R session is the main process and when you call a function like mclapply(), you fork a series of sub-processes that operate independently from the main process (although they share a few low-level features).
  • These sub-processes then execute your function on their subsets of the data, presumably on separate cores of your CPU. Once the computation is complete, each sub-process returns its results and then the sub-process is killed. The parallel package manages the logistics of forking the sub-processes and handling them once they’ve finished.

How many cores can I use?

The first thing you might want to check with the parallel package is if your computer in fact has multiple cores that you can take advantage of.

In [23]:
library(parallel)
nCores <- detectCores()
nCores
32

Let’s build a simple loop that uses sample with replacement to do a bootstrap analysis.

  • We select Sepal.Length and Species from the iris dataset, subset it to 100 observations, and then iterate across 10,000 trials, each time resampling the observations with replacement.
  • Run a logistic regression fitting species as a function of length, and record the coefficients for each trial to be returned.
In [14]:
# Sequential computing
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
  ind <- sample(100, 100, replace=TRUE)
  result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
  r <- coefficients(result1)
  res <- rbind(data.frame(), r)
}
system.time({
  results <- lapply(trials, boot_fx)
})
   user  system elapsed 
 21.142   0.007  21.149 
In [24]:
# Parallel computing using mclapply
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
boot_fx <- function(trial) {
  ind <- sample(100, 100, replace=TRUE)
  result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
  r <- coefficients(result1)
  res <- rbind(data.frame(), r)
}
system.time({
  results <- mclapply(trials, boot_fx, mc.cores = nCores)
})
   user  system elapsed 
 37.632   5.924   1.760 

Note

  • When executing parallel jobs via mclapply() it’s important to pre-calculate how much memory all of the processes will require and make sure this is less than the total amount of memory on your computer.

  • A major advantage of the multicore processing in parallel due to forked parallel processing is that global variables in the main R session are inherited by the child processes. This means the developer does not have to spend efforts on identifying and exporting those to the parallel workers.

  • Only applicable in Unix-style OS.

Parallelize with parLapply

  • Using the forking mechanism on your computer is one way to execute parallel computation but it’s not the only way that the parallel package offers.
  • Another way to build a “cluster” using the multiple cores on your computer is via sockets.
  • A socket is simply a mechanism with which multiple processes or applications running on your computer (or different computers, for that matter) can communicate with each other.
  • With parallel computation, data and results need to be passed back and forth between the parent and child processes and sockets can be used for that purpose.

parLapply

Building a socket cluster is simple to do in R with the makeCluster() function. Here I’m initializing a cluster with 4 components.

In [20]:
# Parallel computing using parLapply
cl <- makeCluster(nCores, type = 'SOCK')
system.time(results <- parLapply(cl, trials, boot_fx))
stopCluster(cl)
Error in checkForRemoteErrors(val): one node produced an error: object 'x' not found
Traceback:

1. system.time(results <- parLapply(cl, trials, boot_fx))
2. parLapply(cl, trials, boot_fx)
3. do.call(c, clusterApply(cl = cl, x = splitList(X, nchunks), fun = lapply, 
 .     FUN = fun, ...), quote = TRUE)
4. clusterApply(cl = cl, x = splitList(X, nchunks), fun = lapply, 
 .     FUN = fun, ...)
5. staticClusterApply(cl, fun, length(x), argfun)
6. checkForRemoteErrors(val)
7. stop("one node produced an error: ", firstmsg, domain = NA)
Timing stopped at: 0.003 0 0.006

parLapply

  • The advantages of this model is that it is supported on all operating systems.
  • The disadvantages are increased communication overhead, and that global variables have to be identified and explicitly exported to each worker in the cluster before processing. As discussed below, another advantage with cluster processing is that it supports also workers on external machines, possibly running in remote locations.
In [25]:
# Parallel computing using parLapply
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- seq(1, 10000)
cl <- makeCluster(nCores, type = 'SOCK')
clusterExport(cl, "x")
system.time(results <- parLapply(cl, trials, boot_fx))
stopCluster(cl)
   user  system elapsed 
  0.030   0.004   1.423 

Parallelize using foreach

  • On Unix-style OS, multicore + doMC + foreach.
  • On Windows (and most OS), snow + doParallel + foreach.
  • Very easy to go back to sequential computing.
In [33]:
# Almost any OS
library(doParallel)
cl <- makeCluster(nCores, type = 'SOCK')
registerDoParallel(cl)
result <- foreach(i = 1:10000) %dopar% sqrt(i)
stopCluster(cl)
In [34]:
# Unix-style OS
library(doMC)
registerDoMC(nCores)
result <- foreach(i = 1:10000, .combine = c) %dopar% sqrt(i)

Go back to the bootstraping example

In [28]:
# Parallel computing using foreach
doMC::registerDoMC(cores = nCores)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
system.time({
  r <- foreach(1:trials, .combine = rbind) %dopar% {
    ind <- sample(100, 100, replace=TRUE)
    result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
    coefficients(result1)
  }
})
   user  system elapsed 
 28.814   5.485   2.569 

Other packages for parallel computing in R

Many R packages come up with parallel features (or arguments). You may refer to this link for more details. Examples are:

  • Data.table is a venerable and powerful package written primarily by Matt Dowle. It is a high-performance implementation of R’s data frame construct, with an enhanced syntax. There have been innumerable benchmarks showcasing the power of the data.table package. It provides a highly optimized tabular data structure for most common analytical operations.
  • The caret package (Classification And REgression Training) is a set of functions that streamline the process for creating predictive models. The package contains tools for data splitting, preprocessing, feature selection, model tuning using resampling, variable importance estimation, and other functionality.
  • Multidplyr is a backend for dplyr that partitions a data frame across multiple cores. You tell multidplyr how to split the data up with partition(), and then the data stays on each node until you explicitly retrieve it with collect(). This minimizes time spent moving data around, and maximizes parallel performance.



Parallel computing in Python

Parallel computing in Python

  • We will look at Python’s multiprocessing module and how we can use it to submit multiple processes that can run independently from each other in order to make best use of our CPU cores.
  • The multiprocessing module in Python’s Standard Library has a lot of powerful features. If you want to read about all the nitty-gritty tips, tricks, and details, I would recommend to use the official documentation as an entry point.
  • In the following sections, I want to provide a brief overview of different approaches to show how the multiprocessing module can be used for parallel programming.

How many parallel process can I run?

In [1]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())
Number of processors:  32

The process class

The most basic approach is probably to use the Process class from the multiprocessing module. Here, we will use a simple queue function to generate four random strings in s parallel.

In [7]:
import multiprocessing as mp
import random
import string

random.seed(123)
# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, pos, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put((pos, rand_str))

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)
[(0, 'tL1Lf'), (1, '3uOHu'), (2, 'OuaPe'), (3, 'kXUXK')]

The Pool class

In [8]:
def cube(x):
    return x**3

pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x in range(1,7)]
print(results)
[1, 8, 27, 64, 125, 216]
In [9]:
pool = mp.Pool(processes=4)
results = pool.map(cube, range(1,7))
print(results)
[1, 8, 27, 64, 125, 216]

The Pool.map and Pool.apply will lock the main program until all processes are finished, which is quite useful if we want to obtain results in a particular order for certain applications. In contrast, the async variants will submit all processes at once and retrieve the results as soon as they are finished. One more difference is that we need to use the get method after the apply_async() call in order to obtain the return values of the finished processes.

In [10]:
pool = mp.Pool(processes=4)
results = [pool.apply_async(cube, args=(x,)) for x in range(1,10)]
output = [p.get() for p in results]
print(output)
[1, 8, 27, 64, 125, 216, 343, 512, 729]

Application

In [11]:
# utils.py
 
import time
import logging
import requests
 
 
class WebsiteDownException(Exception):
    pass
 
 
def ping_website(address, timeout=5):
    """
    Check if a website is down. A website is considered down 
    if either the status_code >= 400 or if the timeout expires
     
    Throw a WebsiteDownException if any of the website down conditions are met
    """
    try:
        response = requests.head(address, timeout=timeout)
        if response.status_code >= 400:
            logging.warning("Website %s returned status_code=%s" % (address, response.status_code))
            raise WebsiteDownException()
    except requests.exceptions.RequestException:
        logging.warning("Timeout expired for website %s" % address)
        raise WebsiteDownException()
         
 
def notify_owner(address):
    """ 
    Send the owner of the address a notification that their website is down 
     
    For now, we're just going to sleep for 0.5 seconds but this is where 
    you would send an email, push notification or text-message
    """
    logging.info("Notifying the owner of %s website" % address)
    time.sleep(0.5)
     
 
def check_website(address):
    """
    Utility function: check if a website is down, if so, notify the user
    """
    try:
        ping_website(address)
    except WebsiteDownException:
        notify_owner(address)
In [12]:
# websites.py
 
WEBSITE_LIST = [
    'http://envato.com',
    'http://baidu.com',
    'http://amazon.com',
    'http://stackoverflow.com',
    'http://github.com',
    'http://heroku.com',
    'http://trello.com',
    'http://yiiframework.com',
    'http://shopify.com',
    'http://airbnb.com',
    'http://instagram.com',
    'http://snapchat.com',
    'http://youtube.com',
    'http://live.com',
    'http://linkedin.com',
    'http://yandex.ru',
    'http://netflix.com',
    'http://wordpress.com',
    'http://bing.com',
]

Serial computing

In [13]:
# serial_squirrel.py
 
import time
 
 
start_time = time.time()
 
for address in WEBSITE_LIST:
    check_website(address)
         
end_time = time.time()        
 
print("Time for SerialSquirrel: %ssecs" % (end_time - start_time))
WARNING:root:Timeout expired for website http://instagram.com
WARNING:root:Timeout expired for website http://youtube.com
WARNING:root:Website http://netflix.com returned status_code=403
WARNING:root:Timeout expired for website http://wordpress.com
Time for SerialSquirrel: 43.01436996459961secs

Parallel computing

In [14]:
# multiprocessing_squirrel.py
 
import time
import socket
import multiprocessing
 
NUM_WORKERS = 4
 
start_time = time.time()
 
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map_async(check_website, WEBSITE_LIST)
    results.wait()
 
end_time = time.time()        
 
print("Time for MultiProcessingSquirrel: %ssecs" % (end_time - start_time))
WARNING:root:Website http://netflix.com returned status_code=403
WARNING:root:Timeout expired for website http://instagram.com
WARNING:root:Timeout expired for website http://youtube.com
WARNING:root:Timeout expired for website http://wordpress.com
Time for MultiProcessingSquirrel: 12.823455572128296secs

Other modules for parallel computing in Python

Many come up with parallel features (or arguments). Examples are:

  • Joblib is a set of tools for lightweight pipelining in Python.
  • Scikit-learn is a free software machine learning library for the Python programming language. It features various classification, regression, and clustering algorithms including support for vector machines, random forests, gradient boosting, k-means and DBSCAN. It is designed to interoperate with the Python numerical and scientific libraries NumPy and SciPy.
  • If you want to know more details about parallel computing in Python, you can refer to [this link] (https://wiki.python.org/moin/ParallelProcessing) or this book.

Lab 4

  • Think about a slow piece of code you have ever written for either your past assignments or projects.
  • Parallelize it.