- Spark is a lightning-fast cluster computing technology, designed for fast computation.
- It is based on Hadoop MapReduce and it extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing.
- The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application.
- Spark is designed to cover a wide range of workloads such as batch applications, iterative algorithms, interactive queries and streaming.

Speed − Spark helps to run an application in Hadoop cluster, up to 100 times faster in memory, and 10 times faster when running on disk. This is possible by reducing number of read/write operations to disk. It stores the intermediate processing data in memory.

Supports multiple languages − Spark provides built-in APIs in Java, Scala, or Python. Therefore, you can write applications in different languages. Spark comes up with 80 high-level operators for interactive querying.

Advanced Analytics − Spark not only supports ‘Map’ and ‘reduce’. It also supports SQL queries, Streaming data, Machine learning (ML), and Graph algorithms.

**DataFrame**is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R.**Spark Streaming**leverages Spark Core's fast scheduling capability to perform streaming analytics. It ingests data in mini-batches and performs RDD (Resilient Distributed Datasets) transformations on those mini-batches of data.**MLlib**is a distributed machine learning framework above Spark because of the distributed memory-based Spark architecture. It is, according to benchmarks, done by the MLlib developers against the Alternating Least Squares (ALS) implementations. Spark MLlib is nine times as fast as the Hadoop disk-based version of Apache Mahout (before Mahout gained a Spark interface).**GraphX**a distributed graph-processing framework on top of Spark. It provides an API for expressing graph computation that can model the user-defined graphs by using Pregel abstraction API. It also provides an optimized runtime for this abstraction.

- Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark.
- It is an immutable distributed collection of objects.
- Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.
- RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.
- There are two ways to create RDDs − parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

- MapReduce is widely adopted for processing and generating large datasets with a parallel, distributed algorithm on a cluster. It allows users to write parallel computations, using a set of high-level operators, without having to worry about work distribution and fault tolerance.
- Unfortunately, in most current frameworks, the only way to reuse data between computations (Ex − between two MapReduce jobs) is to write it to an external stable storage system (Ex − HDFS). Although this framework provides numerous abstractions for accessing a cluster’s computational resources, users still want more.
- Both Iterative and Interactive applications require faster data sharing across parallel jobs. Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Regarding storage system, most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.

- Data sharing is slow in MapReduce due to replication, serialization, and disk IO. Most of the Hadoop applications, they spend more than 90% of the time doing HDFS read-write operations.
- Recognizing this problem, researchers developed a specialized framework called Apache Spark. The key idea of spark is Resilient Distributed Datasets (RDD); it supports in-memory processing computation. This means, it stores the state of memory as an object across the jobs and the object is sharable between those jobs. Data sharing in memory is 10 to 100 times faster than network and Disk.

Spark-shell:

`spark-shell`

PySpark:

`pyspark`

SparkR:

`sparkR`

`spark-submit <SparkApp.py>`

```
import pyspark
sc.stop()
sc = pyspark.SparkContext("local", "My First Spark App")
```

or

`sc = pyspark.SparkContext.getOrCreate()`

```
textFile = sc.textFile("license.txt")
textFile.first()
textFile.count()
```

```
textFile.map(lambda line: len(line.split()))\
.reduce(lambda a, b: a if (a > b) else b)
```

```
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
```

```
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
```

```
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
```

```
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
```

```
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]),
np.array([0, 2]),
np.array([0, 2])), shape=(3, 1))
```

`from pyspark.mllib.linalg import Matrix, Matrices`

```
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])
print(dm2)
```

```
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
print(sm)
```

```
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
```

```
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
```

```
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
```

`from pyspark.mllib.linalg.distributed import RowMatrix`

```
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
```

```
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
```

```
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
```

```
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
print(m,n,rowsRDD)
```

`from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix`

`# Create an RDD of indexed rows.`

```
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
```

```
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
```

Learn about data types in Spark。

Further readings

- Text mining: TF-IDF, Word2Vec, n-gram
- Classification: Naive Bayes, logistic regression, random forecast, decision tree
- Clustering: $k$-Means, LDA, Mixtures
- Dimension reduction: PCA, SVD, LASSO
- Patten recognition
- Streaming data processing

We need to register a spark app:

```
import pyspark
sc = pyspark.SparkContext.getOrCreate()
```

The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the mean squared error at the end to evaluate goodness of fit.

`from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel`

```
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
return LabeledPoint(values[0], values[1:])
```

```
data = sc.textFile("/user/yanfeikang/lpsa.data")
parsedData = data.map(parsePoint)
```

```
# Build the model
model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)
```

```
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda v: (v[0] - v[1])**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
```

```
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
```

```
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
```

```
data = sc.textFile("/user/yanfeikang/sample_svm_data.txt")
parsedData = data.map(parsePoint)
```

```
# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)
```

```
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda v: v[0] != v[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
```

```
from numpy import array
from math import sqrt
```

`from pyspark.mllib.clustering import KMeans, KMeansModel`

```
# Load and parse the data
data = sc.textFile("/user/yanfeikang/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
```

```
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
```

```
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
```

```
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
```

```
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
sc = pyspark.SparkContext.getOrCreate()
```

```
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
```

`mat = RowMatrix(rows)`

```
svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
```

```
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
```

```
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
```

`mat = RowMatrix(rows)`

```
# Compute the top 4 principal components. Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)
```

```
# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)
```