Three APIs of Spark¶
RDD (2011, spark 1.0)
DataFrames (2013, spark 1.3)
Datasets (2015, spark 1.6)
Spark SQL¶
Spark SQL is a Spark module for structured data processing.
Unlike the basic Spark RDD API, Spark SQL provides Spark with more information about the structure of both the data and the computation being performed.
There are several ways to interact with Spark SQL including SQL, the Dataframe API and the Dataset API.
SQL Queries¶
One use of Spark SQL is to execute SQL queries.
Spark SQL can also be used to read data from an existing Hive installation.
When running SQL from within another programming language the results will be returned as a Dataset/DataFrame.
DataFrame API¶
A DataFrame is a Dataset organized into named columns.
It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
DataFrames can be constructed from a wide array of sources such as:
- structured data files,
- tables in Hive,
- external databases, or
- existing RDDs.
The DataFrame API is available in Scala, Java, Python, and R.
Dataset API¶
A Dataset is a distributed collection of data.
Dataset can be constructed from JVM objects and then manipulated using functional transformations (
map, flatMap, filter
, etc.).The Dataset API is available in Scala and Java.
Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).
Spark session and spark context¶
What is SparkContext¶
Spark SparkContext is an entry point to Spark and defined in org.apache.spark package since 1.x and used to programmatically create Spark RDD, accumulators and broadcast variables on the cluster. Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object sc is default available in spark-shell and it can be programmatically created using SparkContext class.
What is SparkSession¶
SparkSession introduced in version 2.0 and and is an entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame and DataSet. It’s object spark is default available in spark-shell and it can be created programmatically using SparkSession builder pattern.
Start a Spark session¶
spark, sc = %spark.get_session test
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/04/15 03:57:20 WARN [Thread-4] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/04/15 03:57:21 WARN [Thread-4] DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 24/04/15 03:57:22 WARN [Thread-4] Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Data sources¶
- Spark SQL supports operating on a variety of data sources through the DataFrame interface.
- A DataFrame can be operated on using relational transformations and can also be used to create a temporary view.
- Registering a DataFrame as a temporary view allows you to run SQL queries over its data.
Generic Load/Save Functions¶
import os
spark_home = os.getenv('SPARK_HOME')
PythonRDD[27] at RDD at PythonRDD.scala:53
df = spark.read.load(f"file://{spark_home}/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
df.show()
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
people = spark.read.load(f"file://{spark_home}/examples/src/main/resources/people.json", format="json")
people.select("name", "age").write.save("namesandAges.parquet", format="parquet")
people.show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
df = spark.read.load(f"file://{spark_home}/examples/src/main/resources/people.csv", format="csv", sep=";", inferSchema="true", header="true")
df.show()
+-----+---+---------+ | name|age| job| +-----+---+---------+ |Jorge| 30|Developer| | Bob| 32|Developer| +-----+---+---------+
Run SQL on files directly¶
df = spark.sql("SELECT * FROM parquet.`file:///opt/apps/SPARK3/spark3-current/examples/src/main/resources/users.parquet`")
df.show()
+------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa| null| [3, 9, 15, 20]| | Ben| red| []| +------+--------------+----------------+
Json files¶
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = f"file://{spark_home}/examples/src/main/resources/people.json"
peopleDF = spark.read.json(path)
# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
root |-- age: long (nullable = true) |-- name: string (nullable = true)
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
+------+---+ | name|age| +------+---+ |Justin| 19| +------+---+
CSV files¶
path = f'file://{spark_home}/examples/src/main/resources/people.csv'
df = spark.read.csv(path)
df.show() # Displays the content of the DataFrame to stdout
+------------------+ | _c0| +------------------+ | name;age;job| |Jorge;30;Developer| | Bob;32;Developer| +------------------+
# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
+-----+---+---------+ | _c0|_c1| _c2| +-----+---+---------+ | name|age| job| |Jorge| 30|Developer| | Bob| 32|Developer| +-----+---+---------+
# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
+-----+---+---------+ | name|age| job| +-----+---+---------+ |Jorge| 30|Developer| | Bob| 32|Developer| +-----+---+---------+
# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)
# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
Processing airdelay data¶
## Load a hdfs file
air0 = spark.read.options(header='true', inferSchema='true').csv("/data/airdelay_small.csv")
air0
DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]
The air delay data contains 5548755 rows with some of the following attributes.
Attribute | Field / Description |
---|---|
Year | Year of flight |
Month | Month of flight |
DayofMonth | Day of month |
DayOfWeek | Day of the week (in number) |
FlightDate | Date on which the flight took place |
UniqueCarrier | Unique Carrier Code. When the same code has been used by multiple carriers, a numeric suffix is used for earlier users, for example, PA, PA(1), PA(2). |
Carrier | Code assigned by IATA and commonly used to identify a carrier. As the same code may have been assigned to different carriers over time, the code is not always unique. |
OriginAirportID | Origin Airport, Airport ID. An identification number assigned by US DOT to identify a unique airport. |
Origin | Origin Airport |
OriginState | Origin Airport, State Code |
DestAirportID | Destination Airport, Airport ID. An identification number assigned by US DOT to identify a unique airport |
Destination | Destination Airport |
DestState | Destination Airport, State Code |
DepTime | Actual Departure Time (local time: hhmm) |
DepDelay | Difference in minutes between scheduled and actual departure time. Early departures show negative numbers. |
ArrTime | Actual Arrival Time (local time: hhmm) |
Cancelled | Cancelled Flight Indicator (1=Yes) |
Distance | Distance between airports (miles) |
CancellationCode | Specifies The Reason For Cancellation |
CarrierDelay | Carrier Delay, in Minutes Weather Delay, in Minutes |
WeatherDelay | Weather DelayLate Aircraft Delay, in Minutes, in Minutes |
NASDelay | National Air System Delay, in Minutes |
SecurityDelay | Security Delay, in Minutes |
LateAircraftDelay | Late Aircraft Delay, in Minutes |
FirstDepTime | First Gate Departure Time at Origin Airport |
TotalAddGTime | Total Ground Time Away from Gate for Gate Return or Cancelled Flight |
LongestAddGTime | Longest Time Away from Gate for Gate Return or Cancelled Flight |
# We specify the correct schema by hand
from pyspark.sql.types import *
schema_sdf = StructType([
StructField('Year', IntegerType(), True),
StructField('Month', IntegerType(), True),
StructField('DayofMonth', IntegerType(), True),
StructField('DayOfWeek', IntegerType(), True),
StructField('DepTime', DoubleType(), True),
StructField('CRSDepTime', DoubleType(), True),
StructField('ArrTime', DoubleType(), True),
StructField('CRSArrTime', DoubleType(), True),
StructField('UniqueCarrier', StringType(), True),
StructField('FlightNum', StringType(), True),
StructField('TailNum', StringType(), True),
StructField('ActualElapsedTime', DoubleType(), True),
StructField('CRSElapsedTime', DoubleType(), True),
StructField('AirTime', DoubleType(), True),
StructField('ArrDelay', DoubleType(), True),
StructField('DepDelay', DoubleType(), True),
StructField('Origin', StringType(), True),
StructField('Dest', StringType(), True),
StructField('Distance', DoubleType(), True),
StructField('TaxiIn', DoubleType(), True),
StructField('TaxiOut', DoubleType(), True),
StructField('Cancelled', IntegerType(), True),
StructField('CancellationCode', StringType(), True),
StructField('Diverted', IntegerType(), True),
StructField('CarrierDelay', DoubleType(), True),
StructField('WeatherDelay', DoubleType(), True),
StructField('NASDelay', DoubleType(), True),
StructField('SecurityDelay', DoubleType(), True),
StructField('LateAircraftDelay', DoubleType(), True)
])
air = spark.read.options(header='true').schema(schema_sdf).csv("/data/airdelay_small.csv")
air
DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: double, CRSDepTime: double, ArrTime: double, CRSArrTime: double, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: double, CRSElapsedTime: double, AirTime: double, ArrDelay: double, DepDelay: double, Origin: string, Dest: string, Distance: double, TaxiIn: double, TaxiOut: double, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: double, WeatherDelay: double, NASDelay: double, SecurityDelay: double, LateAircraftDelay: double]
Descriptive Statistics¶
air.describe().show()
24/04/15 04:07:58 WARN [Thread-4] package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------+-------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+------------------+------------------+-------------------+------------------+ |summary| Year| Month| DayofMonth| DayOfWeek| DepTime| CRSDepTime| ArrTime| CRSArrTime|UniqueCarrier| FlightNum| TailNum|ActualElapsedTime| CRSElapsedTime| AirTime| ArrDelay| DepDelay| Origin| Dest| Distance| TaxiIn| TaxiOut| Cancelled|CancellationCode| Diverted| CarrierDelay| WeatherDelay| NASDelay| SecurityDelay| LateAircraftDelay| +-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------+-------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+------------------+------------------+-------------------+------------------+ | count| 5548754| 5548754| 5548754| 5548754| 5445648| 5548754| 5432958| 5548754| 5548754| 5548754| 5546028| 5432958| 5547553| 3685725| 5432958| 5445648|5548754|5548754| 5539053| 3774512| 3774512| 5548754| 4021064| 5548754| 1556218| 1556218| 1556218| 1556218| 1556218| | mean|1998.0596386143627|6.5676323008733135|15.72343214350465|3.9417689088397143|1350.1788104923419|1335.7331833417015|1494.2596278123262|1491.2232378656543| null|1315.7048634702494|1.7399929153382927|119.6707237567454|120.55155939925224|102.68243425648957| 6.97897995898367| 8.057528690800433| null| null|700.1569530026162| 6.46404329884234|15.093697410420209|0.018581829362051373| null|0.002286999928272185|3.126644853098987|0.6756154985998105|3.4831913009616904|0.02487633480656309| 4.016733516769501| | stddev| 5.959654574466075|3.4459933060072805|8.785812012351068|1.9901372979154741|476.99701466454053| 476.868103935841| 498.5110921082063| 493.9483009456321| null|1348.4296762173037|13.851402104592431|68.46377112245682| 67.92463006419685| 71.58020398372085|30.191156753519497|28.007952373867763| null| null|550.4983362833545|24.09104235711169|11.124695598069447| 0.13504276458297695| null| 0.04776787592956475|18.23182572644342| 8.620073018389627|15.134125044035907| 1.1289874696449134|18.521650519203085| | min| 1987| 1| 1| 1| 1.0| 0.0| 1.0| 0.0| 9E| 1| '144DA| -681.0| -96.0| -1461.0| -1238.0| -1197.0| ABE| ABE| 11.0| 0.0| 0.0| 0| A| 0| 0.0| 0.0| -13.0| 0.0| 0.0| | max| 2007| 12| 31| 7| 2644.0| 2400.0| 2742.0| 2400.0| YV| 999| n816ca| 1766.0| 1487.0| 1936.0| 1779.0| 1752.0| YUM| YUM| 4983.0| 1470.0| 1439.0| 1| NA| 1| 1665.0| 910.0| 1010.0| 382.0| 1060.0| +-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------+-------+-----------------+-----------------+------------------+--------------------+----------------+--------------------+-----------------+------------------+------------------+-------------------+------------------+
air.describe(['ArrDelay']).show()
[Stage 25:=============================> (4 + 4) / 8]
+-------+----------------+ |summary| ArrDelay| +-------+----------------+ | count| 5432958| | mean|6.97897995898367| | stddev|30.1911567535195| | min| -1238.0| | max| 1779.0| +-------+----------------+
Print the schema in a tree format.¶
air.printSchema()
root |-- Year: integer (nullable = true) |-- Month: integer (nullable = true) |-- DayofMonth: integer (nullable = true) |-- DayOfWeek: integer (nullable = true) |-- DepTime: double (nullable = true) |-- CRSDepTime: double (nullable = true) |-- ArrTime: double (nullable = true) |-- CRSArrTime: double (nullable = true) |-- UniqueCarrier: string (nullable = true) |-- FlightNum: string (nullable = true) |-- TailNum: string (nullable = true) |-- ActualElapsedTime: double (nullable = true) |-- CRSElapsedTime: double (nullable = true) |-- AirTime: double (nullable = true) |-- ArrDelay: double (nullable = true) |-- DepDelay: double (nullable = true) |-- Origin: string (nullable = true) |-- Dest: string (nullable = true) |-- Distance: double (nullable = true) |-- TaxiIn: double (nullable = true) |-- TaxiOut: double (nullable = true) |-- Cancelled: integer (nullable = true) |-- CancellationCode: string (nullable = true) |-- Diverted: integer (nullable = true) |-- CarrierDelay: double (nullable = true) |-- WeatherDelay: double (nullable = true) |-- NASDelay: double (nullable = true) |-- SecurityDelay: double (nullable = true) |-- LateAircraftDelay: double (nullable = true)
Select columns¶
air.select(["ArrDelay","AirTime","Distance"]).show()
+--------+-------+--------+ |ArrDelay|AirTime|Distance| +--------+-------+--------+ | 2.0| 25.0| 127.0| | 29.0| 248.0| 1623.0| | 8.0| null| 622.0| | -2.0| 70.0| 451.0| | 11.0| 133.0| 1009.0| | 13.0| 177.0| 1562.0| | -12.0| 181.0| 1589.0| | 11.0| 364.0| 2611.0| | 13.0| 53.0| 304.0| | 9.0| null| 888.0| | -8.0| 293.0| 2537.0| | 15.0| null| 1723.0| | -14.0| null| 1736.0| | 55.0| 285.0| 1927.0| | 23.0| 149.0| 991.0| | 64.0| 35.0| 193.0| | 29.0| 25.0| 77.0| | -8.0| null| 447.0| | -6.0| 91.0| 678.0| | 35.0| 127.0| 998.0| +--------+-------+--------+ only showing top 20 rows
air.select(air['UniqueCarrier'], air['ArrDelay']>0).show()
+-------------+--------------+ |UniqueCarrier|(ArrDelay > 0)| +-------------+--------------+ | XE| true| | CO| true| | AA| true| | WN| false| | CO| true| | AA| true| | DL| false| | AA| true| | US| true| | AA| true| | AS| false| | UA| true| | TW| false| | NW| true| | NW| true| | AA| true| | DH| true| | WN| false| | AA| false| | CO| true| +-------------+--------------+ only showing top 20 rows
# group data with respect to some columns
air.groupBy(["UniqueCarrier","DayOfWeek"]).count().show()
[Stage 30:====================================> (5 + 3) / 8]
+-------------+---------+------+ |UniqueCarrier|DayOfWeek| count| +-------------+---------+------+ | PS| 6| 406| | CO| 4| 55764| | ML (1)| 7| 442| | XE| 4| 14896| | TZ| 4| 1455| | OO| 3| 17310| | EA| 7| 6197| | OO| 4| 17666| | F9| 2| 1679| | EA| 5| 6295| | HA| 5| 1519| | UA| 4| 89272| | EV| 4| 9729| | DL| 6|106031| | FL| 5| 6962| | YV| 3| 4165| | AQ| 2| 1035| | ML (1)| 2| 502| | DL| 3|110827| | YV| 6| 3828| +-------------+---------+------+ only showing top 20 rows
## Group and sort
aircount=air.groupBy("UniqueCarrier").count()
aircount.sort("count",ascending=False).show()
[Stage 33:==================================================> (7 + 1) / 8]
+-------------+------+ |UniqueCarrier| count| +-------------+------+ | DL|765388| | WN|703368| | AA|684522| | US|649056| | UA|611957| | NW|473820| | CO|373858| | TW|179081| | HP|173509| | MQ|164790| | AS|129863| | OO|120223| | XE| 94311| | EV| 67148| | OH| 60630| | FL| 47540| | EA| 43723| | PI| 41489| | DH| 32900| | B6| 29111| +-------------+------+ only showing top 20 rows
Data cleaning¶
## Returns a new DataFrame omitting rows with null values
air_without_na = air.na.drop()
air_without_na.show()
[Stage 38:======================================> (2 + 1) / 3]
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
air_without_na.count()
0
air.count() # original file size
5548754
air.show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |2006| 7| 6| 4| 2055.0| 2055.0| 2150.0| 2148.0| XE| 2619| N11526| 55.0| 53.0| 25.0| 2.0| 0.0| IAH| LCH| 127.0| 8.0| 22.0| 0| null| 0| 0.0| 0.0| 0.0| 0.0| 0.0| |1997| 3| 2| 7| 2019.0| 2015.0| 2314.0| 2245.0| CO| 143| N59302| 295.0| 270.0| 248.0| 29.0| 4.0| EWR| COS| 1623.0| 6.0| 41.0| 0| NA| 0| null| null| null| null| null| |1994| 5| 2| 1| 700.0| 700.0| 804.0| 756.0| AA| 1629| NA| 124.0| 116.0| null| 8.0| 0.0| BWI| ORD| 622.0| null| null| 0| NA| 0| null| null| null| null| null| |1997| 2| 14| 5| 700.0| 700.0| 728.0| 730.0| WN| 1783| N332| 88.0| 90.0| 70.0| -2.0| 0.0| TUS| LAX| 451.0| 8.0| 10.0| 0| NA| 0| null| null| null| null| null| |2000| 6| 11| 7| 2052.0| 2034.0| 2132.0| 2121.0| CO| 1753| N16893| 160.0| 167.0| 133.0| 11.0| 18.0| IAH| PHX| 1009.0| 5.0| 22.0| 0| NA| 0| null| null| null| null| null| |1997| 11| 16| 7| 1503.0| 1414.0| 1912.0| 1859.0| AA| 414| N205AA| 189.0| 225.0| 177.0| 13.0| 49.0| DFW| BOS| 1562.0| 4.0| 8.0| 0| NA| 0| null| null| null| null| null| |1999| 5| 31| 1| 839.0| 845.0| 1409.0| 1421.0| DL| 152| N177DZ| 210.0| 216.0| 181.0| -12.0| -6.0| SLC| ATL| 1589.0| 11.0| 18.0| 0| NA| 0| null| null| null| null| null| |1997| 4| 30| 3| 828.0| 830.0| 1206.0| 1155.0| AA| 11| N5DAAA| 398.0| 385.0| 364.0| 11.0| -2.0| BOS| LAX| 2611.0| 5.0| 29.0| 0| NA| 0| null| null| null| null| null| |1995| 7| 21| 5| 1018.0| 1000.0| 1126.0| 1113.0| US| 272| N274US| 68.0| 73.0| 53.0| 13.0| 18.0| GSO| PIT| 304.0| 4.0| 11.0| 0| NA| 0| null| null| null| null| null| |1989| 9| 28| 4| 1501.0| 1450.0| 1639.0| 1630.0| AA| 905| NA| 158.0| 160.0| null| 9.0| 11.0| ORD| DEN| 888.0| null| null| 0| NA| 0| null| null| null| null| null| |2007| 10| 26| 5| 654.0| 700.0| 1507.0| 1515.0| AS| 802| N648AS| 313.0| 315.0| 293.0| -8.0| -6.0| PDX| BOS| 2537.0| 9.0| 11.0| 0| null| 0| 0.0| 0.0| 0.0| 0.0| 0.0| |1993| 6| 28| 1| 1540.0| 1541.0| 2146.0| 2131.0| UA| 1746| NA| 246.0| 230.0| null| 15.0| -1.0| SAN| ORD| 1723.0| null| null| 0| NA| 0| null| null| null| null| null| |1994| 3| 31| 4| 1909.0| 1900.0| 2116.0| 2130.0| TW| 819| NA| 247.0| 270.0| null| -14.0| 9.0| STL| SFO| 1736.0| null| null| 0| NA| 0| null| null| null| null| null| |2000| 5| 23| 2| 958.0| 910.0| 1205.0| 1110.0| NW| 281| N523US| 307.0| 300.0| 285.0| 55.0| 48.0| DTW| SEA| 1927.0| 8.0| 14.0| 0| NA| 0| null| null| null| null| null| |2004| 10| 18| 1| 939.0| 940.0| 1151.0| 1128.0| NW| 679| N317NB| 192.0| 168.0| 149.0| 23.0| -1.0| MSP| SLC| 991.0| 10.0| 33.0| 0| null| 0| 0.0| 0.0| 23.0| 0.0| 0.0| |2007| 5| 17| 4| 1944.0| 1830.0| 2034.0| 1930.0| AA| 1011| N3BDAA| 50.0| 60.0| 35.0| 64.0| 74.0| MIA| MCO| 193.0| 4.0| 11.0| 0| null| 0| 64.0| 0.0| 0.0| 0.0| 0.0| |2003| 7| 31| 4| 1738.0| 1710.0| 1819.0| 1750.0| DH| 7981| N309UE| 41.0| 40.0| 25.0| 29.0| 28.0| IAD| CHO| 77.0| 3.0| 12.0| 0| null| 0| 29.0| 0.0| 0.0| 0.0| 0.0| |1993| 3| 17| 3| 630.0| 630.0| 747.0| 755.0| WN| 1968| NA| 77.0| 85.0| null| -8.0| 0.0| SFO| SAN| 447.0| null| null| 0| NA| 0| null| null| null| null| null| |2002| 8| 25| 7| 1255.0| 1250.0| 1552.0| 1558.0| AA| 768| N464AA| 117.0| 128.0| 91.0| -6.0| 5.0| ORD| PHL| 678.0| 7.0| 19.0| 0| NA| 0| null| null| null| null| null| |1995| 7| 30| 7| 2018.0| 1955.0| 2318.0| 2243.0| CO| 765| N578PE| 180.0| 168.0| 127.0| 35.0| 23.0| EWR| TPA| 998.0| 3.0| 50.0| 0| NA| 0| null| null| null| null| null| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ only showing top 20 rows
Statistics¶
air.corr("Distance","ArrDelay")
0.0024019187216827364
air.cov("Distance","ArrDelay")
39.54483781577919
air.filter(air.ArrDelay > 60).show() # filter with certain conditions
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |2007| 5| 17| 4| 1944.0| 1830.0| 2034.0| 1930.0| AA| 1011| N3BDAA| 50.0| 60.0| 35.0| 64.0| 74.0| MIA| MCO| 193.0| 4.0| 11.0| 0| null| 0| 64.0| 0.0| 0.0| 0.0| 0.0| |2001| 5| 31| 4| 1812.0| 1650.0| 1954.0| 1825.0| WN| 1612| N641@@| 102.0| 95.0| 86.0| 89.0| 82.0| MDW| BHM| 570.0| 3.0| 13.0| 0| NA| 0| null| null| null| null| null| |1999| 8| 13| 5| 1800.0| 1635.0| 2000.0| 1826.0| DL| 965| N404DA| 120.0| 111.0| 97.0| 94.0| 85.0| RSW| ATL| 515.0| 10.0| 13.0| 0| NA| 0| null| null| null| null| null| |2004| 4| 16| 5| 1550.0| 1525.0| 1750.0| 1638.0| XE| 2602| N15948| 120.0| 73.0| 35.0| 72.0| 25.0| EWR| BWI| 169.0| 5.0| 80.0| 0| null| 0| 0.0| 0.0| 51.0| 0.0| 21.0| |1999| 8| 17| 2| 1246.0| 1135.0| 1430.0| 1310.0| CO| 1843| N77303| 104.0| 95.0| 71.0| 80.0| 71.0| EWR| CMH| 462.0| 6.0| 27.0| 0| NA| 0| null| null| null| null| null| |1999| 6| 10| 4| 2214.0| 0.0| 2259.0| 0.0| UA| 1659| N1829U| 105.0| 113.0| 83.0| 101.0| 109.0| BWI| ORD| 622.0| 10.0| 12.0| 0| NA| 0| null| null| null| null| null| |1988| 5| 17| 2| 1714.0| 1516.0| 1938.0| 1739.0| AA| 885| NA| 204.0| 203.0| null| 119.0| 118.0| PHL| DFW| 1302.0| null| null| 0| NA| 0| null| null| null| null| null| |2005| 7| 1| 5| 1931.0| 1723.0| 2235.0| 2017.0| FL| 831| N978AT| 124.0| 114.0| 86.0| 138.0| 128.0| MDW| ATL| 590.0| 9.0| 29.0| 0| null| 0| 0.0| 0.0| 10.0| 0.0| 128.0| |2005| 11| 22| 2| 1849.0| 1705.0| 2034.0| 1815.0| B6| 55| N585JB| 105.0| 70.0| 56.0| 139.0| 104.0| BTV| JFK| 267.0| 4.0| 45.0| 0| null| 0| 0.0| 0.0| 35.0| 0.0| 104.0| |1998| 6| 14| 7| 2015.0| 1820.0| 2058.0| 1924.0| NW| 1267| N715RC| 103.0| 124.0| 85.0| 94.0| 115.0| CLE| MSP| 622.0| 7.0| 11.0| 0| NA| 0| null| null| null| null| null| |2007| 10| 21| 7| 1725.0| 1625.0| 1837.0| 1730.0| WN| 143| N389SW| 72.0| 65.0| 42.0| 67.0| 60.0| LAS| LAX| 236.0| 12.0| 18.0| 0| null| 0| 0.0| 0.0| 7.0| 0.0| 60.0| |2005| 4| 15| 5| 1519.0| 1310.0| 1751.0| 1545.0| AS| 631| N947AS| 152.0| 155.0| 136.0| 126.0| 129.0| LAS| SEA| 866.0| 5.0| 11.0| 0| null| 0| 4.0| 0.0| 0.0| 0.0| 122.0| |2003| 3| 13| 4| 1947.0| 1845.0| 2057.0| 1942.0| XE| 2736| N16520| 70.0| 57.0| 38.0| 75.0| 62.0| IAH| SHV| 192.0| 8.0| 24.0| 0| NA| 0| null| null| null| null| null| |2006| 1| 24| 2| 1230.0| 1115.0| 1343.0| 1228.0| OH| 5050| N784CA| 73.0| 73.0| 50.0| 75.0| 75.0| JFK| BOS| 187.0| 5.0| 18.0| 0| null| 0| 75.0| 0.0| 0.0| 0.0| 0.0| |2007| 6| 16| 6| 1525.0| 1155.0| 1636.0| 1255.0| XE| 2317| N14933| 71.0| 60.0| 44.0| 221.0| 210.0| LFT| IAH| 201.0| 10.0| 17.0| 0| null| 0| 0.0| 0.0| 104.0| 0.0| 117.0| |1990| 9| 18| 2| 1351.0| 1245.0| 1445.0| 1325.0| US| 2901| NA| 54.0| 40.0| null| 80.0| 66.0| SEA| BLI| 94.0| null| null| 0| NA| 0| null| null| null| null| null| |1996| 9| 9| 1| 2135.0| 1955.0| 2247.0| 2114.0| US| 866| N351US| 72.0| 79.0| 52.0| 93.0| 100.0| LGA| RIC| 292.0| 10.0| 10.0| 0| NA| 0| null| null| null| null| null| |2006| 12| 1| 5| 1016.0| 720.0| 1226.0| 925.0| MQ| 3484| N902BC| 190.0| 185.0| 167.0| 181.0| 176.0| CHS| DFW| 987.0| 11.0| 12.0| 0| null| 0| 176.0| 0.0| 5.0| 0.0| 0.0| |1997| 12| 29| 1| 2349.0| 2200.0| 159.0| 2345.0| US| 634| N408US| 130.0| 105.0| 115.0| 134.0| 109.0| CLT| EWR| 529.0| 6.0| 9.0| 0| NA| 0| null| null| null| null| null| |2007| 8| 25| 6| 1544.0| 1430.0| 2123.0| 2015.0| AA| 2073| N612AA| 279.0| 285.0| 256.0| 68.0| 74.0| ORD| SJU| 2072.0| 4.0| 19.0| 0| null| 0| 68.0| 0.0| 0.0| 0.0| 0.0| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ only showing top 20 rows
Pandas API¶
- Pandas API on Spark allows you to scale your pandas workload to any size by running it distributed across multiple nodes.
- If you are already familiar with pandas and want to leverage Spark for big data, pandas API on Spark makes you immediately productive and lets you migrate your applications without modifying the code.
- You can have a single codebase that works both with pandas (tests, smaller datasets) and with Spark (production, distributed datasets) and you can switch between the pandas API and the Pandas API on Spark easily and without overhead.
%spark.stop_session
'success stop spark application application_1709108427864_0316'