Spark

What is Spark

  • Spark is a lightning-fast cluster computing technology, designed for fast computation.
  • It is based on Hadoop MapReduce and it extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing.
  • The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application.
  • Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries and streaming.

Features of Spark

  • Speed − Spark helps to run an application in Hadoop cluster, up to 100 times faster in memory, and 10 times faster when running on disk. This is possible by reducing number of read/write operations to disk. It stores the intermediate processing data in memory.

  • Supports multiple languages − Spark provides built-in APIs in Java, Scala, or Python. Therefore, you can write applications in different languages. Spark comes up with 80 high-level operators for interactive querying.

  • Advanced Analytics − Spark not only supports ‘Map’ and ‘reduce’. It also supports SQL queries, Streaming data, Machine learning (ML), and Graph algorithms.

Components of Spark

  • DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R.
  • Spark Streaming leverages Spark Core's fast scheduling capability to perform streaming analytics. It ingests data in mini-batches and performs RDD (Resilient Distributed Datasets) transformations on those mini-batches of data.
  • MLlib is a distributed machine learning framework above Spark because of the distributed memory-based Spark architecture. It is, according to benchmarks, done by the MLlib developers against the Alternating Least Squares (ALS) implementations. Spark MLlib is nine times as fast as the Hadoop disk-based version of Apache Mahout (before Mahout gained a Spark interface).
  • GraphX a distributed graph-processing framework on top of Spark. It provides an API for expressing graph computation that can model the user-defined graphs by using Pregel abstraction API. It also provides an optimized runtime for this abstraction.

Spark RDD

  • Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark.
  • It is an immutable distributed collection of objects.
  • Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.
  • RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.
  • There are two ways to create RDDs − parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

Data Sharing is Slow in MapReduce

  • MapReduce is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance.
  • Unfortunately, in most current frameworks, the only way to reuse data between computations (Ex − between two MapReduce jobs) is to write it to an external stable storage system (Ex − HDFS). Although this framework provides numerous abstractions for accessing a cluster’s computational resources, users still want more.
  • Both Iterative and Interactive applications require faster data sharing across parallel jobs. Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Regarding storage system, most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.

Data Sharing using Spark RDD

  • Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.
  • Recognizing this problem, researchers developed a specialized framework called Apache Spark. The key idea of spark is Resilient Distributed Datasets (RDD); it supports in-memory processing computation. This means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk.

How to run spark

  • Spark-shell: spark-shell

  • PySpark: pyspark

  • SparkR: sparkR

Spark Batch

spark-submit <SparkApp.py>

Creat a spark application

import pyspark sc.stop() sc = pyspark.SparkContext("local", "My First Spark App")

or

sc = pyspark.SparkContext.getOrCreate()

Count lines of a txt file

textFile = sc.textFile("license.txt") textFile.first() textFile.count()

Spark MapReduce

textFile.map(lambda line: len(line.split()))\ .reduce(lambda a, b: a if (a > b) else b)

Data types in Spark

Local vector

import numpy as np import scipy.sparse as sps from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector. dv1 = np.array([1.0, 0.0, 3.0])

# Use a Python list as a dense vector. dv2 = [1.0, 0.0, 3.0]

# Create a SparseVector. sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])

# Use a single-column SciPy csc_matrix as a sparse vector. sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))

Data types in Spark

Local matrix

from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6]) print(dm2)

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8]) print(sm)

Data types in Spark

Labeled Points

from pyspark.mllib.linalg import SparseVector from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector. pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector. neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

Data types in Spark

Sparse data

from pyspark.mllib.util import MLUtils examples = MLUtils.loadLibSVMFile(sc, "libsvm_data.txt") print(examples)

Data types in Spark

Distributed matrix: RowMatrix

from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors. rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors. mat = RowMatrix(rows)

# Get its size. m = mat.numRows() # 4 n = mat.numCols() # 3

# Get the rows as an RDD of vectors again. rowsRDD = mat.rows print(m,n,rowsRDD)

Data types in Spark

Distributed matrix: IndexedRowMatrix

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Create an RDD of indexed rows.

# - This can be done explicitly with the IndexedRow class: indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), IndexedRow(1, [4, 5, 6]), IndexedRow(2, [7, 8, 9]), IndexedRow(3, [10, 11, 12])])

# - or by using (long, vector) tuples: indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9]), (3, [10, 11, 12])])

Spark and Machine Learning

Machine learning in Spark MLlib

  • Text mining: TF-IDF, Word2Vec, n-gram
  • Classification: Naive Bayes, logistic regression, random forecast, decision tree
  • Clustering: $k$-Means, LDA, Mixtures
  • Dimension reduction: PCA, SVD, LASSO
  • Patten recognition
  • Streaming data processing

Before start

We need to register a spark app:

import pyspark sc = pyspark.SparkContext.getOrCreate()

Linear regression

The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the mean squared error at the end to evaluate goodness of fit.

from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

# Load and parse the data def parsePoint(line): values = [float(x) for x in line.replace(',', ' ').split(' ')] return LabeledPoint(values[0], values[1:])

data = sc.textFile("/user/yanfeikang/lpsa.data") parsedData = data.map(parsePoint)

# Build the model model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)

# Evaluate the model on training data valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) MSE = valuesAndPreds.map(lambda v: (v[0] - v[1])**2).reduce(lambda x, y: x + y) / valuesAndPreds.count() print("Mean Squared Error = " + str(MSE))

Logisitic regression

from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel from pyspark.mllib.regression import LabeledPoint

# Load and parse the data def parsePoint(line): values = [float(x) for x in line.split(' ')] return LabeledPoint(values[0], values[1:])

data = sc.textFile("/user/yanfeikang/sample_svm_data.txt") parsedData = data.map(parsePoint)

# Build the model model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda v: v[0] != v[1]).count() / float(parsedData.count()) print("Training Error = " + str(trainErr))

$k$-means

from numpy import array from math import sqrt

from pyspark.mllib.clustering import KMeans, KMeansModel

# Load and parse the data data = sc.textFile("/user/yanfeikang/kmeans_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data) clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) print("Within Set Sum of Squared Error = " + str(WSSSE))

Matrix computation in Spark

SVD

from pyspark.mllib.linalg import Vectors from pyspark.mllib.linalg.distributed import RowMatrix sc = pyspark.SparkContext.getOrCreate()

rows = sc.parallelize([ Vectors.sparse(5, {1: 1.0, 3: 7.0}), Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) ])

mat = RowMatrix(rows)

svd = mat.computeSVD(5, computeU=True) U = svd.U # The U factor is a RowMatrix. s = svd.s # The singular values are stored in a local dense vector. V = svd.V # The V factor is a local dense matrix.

Matrix computation in Spark

PCA

from pyspark.mllib.linalg import Vectors from pyspark.mllib.linalg.distributed import RowMatrix

rows = sc.parallelize([ Vectors.sparse(5, {1: 1.0, 3: 7.0}), Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) ])

mat = RowMatrix(rows)

# Compute the top 4 principal components. Principal components are stored in a local dense matrix. pc = mat.computePrincipalComponents(4)

# Project the rows to the linear space spanned by the top 4 principal components. projected = mat.multiply(pc)