Big Data Essentials

L11: Data Processing with Spark





Yanfei Kang
yanfeikang@buaa.edu.cn
School of Economics and Management
Beihang University
http://yanfei.site

Spark SQL

  • Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

  • Spark SQL uses this extra information to perform extra optimizations.

  • One use of Spark SQL is to execute SQL queries.

  • Spark SQL can also be used to read data from an existing Hive installation.

Datasets and DataFrames

Spark Datasets

  • A Dataset is a distributed collection of data.

  • Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

  • The Dataset API is available in Scala and Java.

  • Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).

Spark DataFrame

  • A DataFrame is a Dataset organized into named columns.

  • It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

  • DataFrames can be constructed from a wide array of sources such as:

    • structured data files,
    • tables in Hive,
    • external databases, or
    • existing RDDs.
  • The DataFrame API is available in Scala, Java, Python, and R.

Spark session and spark context

What is SparkContext

Spark SparkContext is an entry point to Spark and defined in org.apache.spark package since 1.x and used to programmatically create Spark RDD, accumulators and broadcast variables on the cluster. Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object sc is default available in spark-shell and it can be programmatically created using SparkContext class.

What is SparkSession

SparkSession introduced in version 2.0 and and is an entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame and DataSet. It’s object spark is default available in spark-shell and it can be created programmatically using SparkSession builder pattern.

Start a Spark session

In [107]:
import findspark
findspark.init('/usr/lib/spark-current/')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark").getOrCreate()

Creating DataFrames

Convert an RDD to a DataFrame

In [108]:
# sc.stop()
sc = spark.sparkContext # make a spark context for RDD
In [102]:
# Load a text file and convert each line to a Row.
from pyspark.sql import Row
lines = sc.textFile("/opt/apps/ecm/service/spark/2.4.5-hadoop3.1-1.0.2/package/spark-2.4.5-hadoop3.1-1.0.2/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
people
Out[102]:
PythonRDD[2] at RDD at PythonRDD.scala:53
In [95]:
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.show()
+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+

In [96]:
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
+------+---+
|  name|age|
+------+---+
|Justin| 19|
+------+---+

In [89]:
teenagers.toPandas() # We could export the Spark DataFrame to a usual Pandas DataFrame
Out[89]:
name age
0 Justin 19

Create Spark DataFrame directly from a file

In [109]:
sdf = spark.read.csv("/opt/apps/ecm/service/spark/2.4.5-hadoop3.1-1.0.2/package/spark-2.4.5-hadoop3.1-1.0.2/examples/src/main/resources/people.csv")
sdf.show() # Displays the content of the DataFrame to stdout
+------------------+
|               _c0|
+------------------+
|      name;age;job|
|Jorge;30;Developer|
|  Bob;32;Developer|
+------------------+

In [13]:
sdf2 = spark.read.json("/opt/apps/ecm/service/spark/2.4.5-hadoop3.1-1.0.2/package/spark-2.4.5-hadoop3.1-1.0.2/examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
sdf2.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

The CSV file dose not have a header of the data, but we could create a description (schema in Spark) for it .

In [15]:
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.

# Create a schema
schemaString = ["name", "age"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
schema = StructType(fields)
schema
Out[15]:
StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))
In [16]:
sdf_withschema = spark.createDataFrame(people, schema)
sdf_withschema.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

Export DataFrame to a local disk

In [17]:
sdf.write.mode('overwrite').csv("myspark/")
import os 
os.listdir("myspark")
Out[17]:
['part-00000-c9ed3cde-6299-4462-acf3-265a53291dc6-c000.csv',
 '._SUCCESS.crc',
 '.part-00000-c9ed3cde-6299-4462-acf3-265a53291dc6-c000.csv.crc',
 '_SUCCESS']

Read file and infer the schema from the header

In [26]:
## Load a local file 
air0 = spark.read.options(header='true', inferSchema='true').csv("/home/yanfei/lectures/data/airdelay_small.csv") 
air0
Out[26]:
DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]
In [51]:
# We specify the correct schema by hand
from pyspark.sql.types import *
schema_sdf = StructType([
        StructField('Year', IntegerType(), True),
        StructField('Month', IntegerType(), True),
        StructField('DayofMonth', IntegerType(), True),
        StructField('DayOfWeek', IntegerType(), True),
        StructField('DepTime', DoubleType(), True),
        StructField('CRSDepTime', DoubleType(), True),
        StructField('ArrTime', DoubleType(), True),
        StructField('CRSArrTime', DoubleType(), True),
        StructField('UniqueCarrier', StringType(), True),
        StructField('FlightNum', StringType(), True),
        StructField('TailNum', StringType(), True),
        StructField('ActualElapsedTime', DoubleType(), True),
        StructField('CRSElapsedTime',  DoubleType(), True),
        StructField('AirTime',  DoubleType(), True),
        StructField('ArrDelay',  DoubleType(), True),
        StructField('DepDelay',  DoubleType(), True),
        StructField('Origin', StringType(), True),
        StructField('Dest',  StringType(), True),
        StructField('Distance',  DoubleType(), True),
        StructField('TaxiIn',  DoubleType(), True),
        StructField('TaxiOut',  DoubleType(), True),
        StructField('Cancelled',  IntegerType(), True),
        StructField('CancellationCode',  StringType(), True),
        StructField('Diverted',  IntegerType(), True),
        StructField('CarrierDelay', DoubleType(), True),
        StructField('WeatherDelay',  DoubleType(), True),
        StructField('NASDelay',  DoubleType(), True),
        StructField('SecurityDelay',  DoubleType(), True),
        StructField('LateAircraftDelay',  DoubleType(), True)
    ])

air = spark.read.options(header='true').schema(schema_sdf).csv("/home/yanfei/lectures/data/airdelay_small.csv")
air
Out[51]:
DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: double, CRSDepTime: double, ArrTime: double, CRSArrTime: double, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: double, CRSElapsedTime: double, AirTime: double, ArrDelay: double, DepDelay: double, Origin: string, Dest: string, Distance: double, TaxiIn: double, TaxiOut: double, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: double, WeatherDelay: double, NASDelay: double, SecurityDelay: double, LateAircraftDelay: double]

Descriptive Statistics

In [28]:
air.describe().show()
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-------------+------------------+-------+------------------+------------------+------------------+-----------------+------------------+-------+-------+-----------------+------------------+------------------+---------+----------------+--------+------------------+------------------+------------------+--------------------+------------------+
|summary|              Year|             Month|        DayofMonth|         DayOfWeek|           DepTime|       CRSDepTime|           ArrTime|        CRSArrTime|UniqueCarrier|         FlightNum|TailNum| ActualElapsedTime|    CRSElapsedTime|           AirTime|         ArrDelay|          DepDelay| Origin|   Dest|         Distance|            TaxiIn|           TaxiOut|Cancelled|CancellationCode|Diverted|      CarrierDelay|      WeatherDelay|          NASDelay|       SecurityDelay| LateAircraftDelay|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-------------+------------------+-------+------------------+------------------+------------------+-----------------+------------------+-------+-------+-----------------+------------------+------------------+---------+----------------+--------+------------------+------------------+------------------+--------------------+------------------+
|  count|           1524481|           1524481|           1524481|           1524481|           1524481|          1524481|           1524481|           1524481|      1524481|           1524481|1524481|           1524481|           1524481|           1524481|          1524481|           1524481|1524481|1524481|          1524481|           1524481|           1524481|  1524481|               0| 1524481|           1524481|           1524481|           1524481|             1524481|           1524481|
|   mean|2005.2200309482375| 6.819439533847913|15.739877374660622|3.9425345412635515|1343.1007890554229|1335.312045870037|1490.5321424143692|1499.0464518744411|         null|2100.4755290489024|    0.0|124.89908893584112|126.23595243233599|102.60629092786331|7.728503667805634| 9.065091660702889|   null|   null|723.8429826281863|7.4043402311999955|16.027061012895537|      0.0|            null|     0.0|3.1917360728011697|0.6896806191746568| 3.555705187535955|0.025394216129948487| 4.100354809276075|
| stddev|1.3298684774182468|3.3772885531435493| 8.788305183327493|1.9915258421755744|475.47666780749284|462.8261301202478| 499.1625758193302| 478.3461722588984|         null| 1937.457867690337|    0.0|  70.9126133314637| 70.11716827588829| 78.48819536331416|35.41450193208633|32.169219586304244|   null|   null|571.7621650788815| 35.98689560676167|12.080643220148497|      0.0|            null|     0.0| 18.41498545167556| 8.708781309481942|15.282413095867522|  1.1406729402330553|18.704288068396867|
|    min|              2003|                 1|                 1|                 1|               1.0|              3.0|               1.0|               0.0|           9E|                 1|      0|            -681.0|              16.0|           -1461.0|           -735.0|           -1197.0|    ABE|    ABE|             27.0|               0.0|               0.0|        0|            null|       0|               0.0|               0.0|             -13.0|                 0.0|               0.0|
|    max|              2007|                12|                31|                 7|            2644.0|           2359.0|            2742.0|            2359.0|           YV|               999| n816ca|            1766.0|             660.0|            1936.0|           1779.0|            1752.0|    YUM|    YUM|           4962.0|            1470.0|            1439.0|        0|            null|       0|            1665.0|             910.0|            1010.0|               382.0|            1060.0|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-------------+------------------+-------+------------------+------------------+------------------+-----------------+------------------+-------+-------+-----------------+------------------+------------------+---------+----------------+--------+------------------+------------------+------------------+--------------------+------------------+

In [29]:
air.describe(['ArrDelay']).show()
+-------+------------------+
|summary|          ArrDelay|
+-------+------------------+
|  count|           5432958|
|   mean|  6.97897995898367|
| stddev|30.191156753519472|
|    min|           -1238.0|
|    max|            1779.0|
+-------+------------------+

In [30]:
air.printSchema()
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: double (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)

Select columns

In [31]:
air.select(["ArrDelay","AirTime","Distance"]).show()
+--------+-------+--------+
|ArrDelay|AirTime|Distance|
+--------+-------+--------+
|     2.0|   25.0|   127.0|
|    29.0|  248.0|  1623.0|
|    null|   null|    null|
|    -2.0|   70.0|   451.0|
|    11.0|  133.0|  1009.0|
|    13.0|  177.0|  1562.0|
|   -12.0|  181.0|  1589.0|
|    11.0|  364.0|  2611.0|
|    13.0|   53.0|   304.0|
|    null|   null|    null|
|    -8.0|  293.0|  2537.0|
|    null|   null|    null|
|    null|   null|    null|
|    55.0|  285.0|  1927.0|
|    23.0|  149.0|   991.0|
|    64.0|   35.0|   193.0|
|    29.0|   25.0|    77.0|
|    null|   null|    null|
|    -6.0|   91.0|   678.0|
|    35.0|  127.0|   998.0|
+--------+-------+--------+
only showing top 20 rows

In [32]:
air.select(air['UniqueCarrier'], air['ArrDelay']>0).show()
+-------------+--------------+
|UniqueCarrier|(ArrDelay > 0)|
+-------------+--------------+
|           XE|          true|
|           CO|          true|
|           AA|          true|
|           WN|         false|
|           CO|          true|
|           AA|          true|
|           DL|         false|
|           AA|          true|
|           US|          true|
|           AA|          true|
|           AS|         false|
|           UA|          true|
|           TW|         false|
|           NW|          true|
|           NW|          true|
|           AA|          true|
|           DH|          true|
|           WN|         false|
|           AA|         false|
|           CO|          true|
+-------------+--------------+
only showing top 20 rows

In [33]:
# group data with respect to some columns 
air.groupBy(["UniqueCarrier","DayOfWeek"]).count().show() 
+-------------+---------+------+
|UniqueCarrier|DayOfWeek| count|
+-------------+---------+------+
|           PS|        6|   406|
|           CO|        4| 55764|
|       ML (1)|        7|   442|
|           XE|        4| 14896|
|           TZ|        4|  1455|
|           OO|        3| 17310|
|           EA|        7|  6197|
|           OO|        4| 17666|
|           F9|        2|  1679|
|           EA|        5|  6295|
|           HA|        5|  1519|
|           UA|        4| 89272|
|           EV|        4|  9729|
|           DL|        6|106031|
|           FL|        5|  6962|
|           YV|        3|  4165|
|           AQ|        2|  1035|
|       ML (1)|        2|   502|
|           DL|        3|110827|
|           YV|        6|  3828|
+-------------+---------+------+
only showing top 20 rows

In [34]:
## Group and sort
aircount=air.groupBy("UniqueCarrier").count()
aircount.sort("count",ascending=False).show()
+-------------+------+
|UniqueCarrier| count|
+-------------+------+
|           DL|765388|
|           WN|703368|
|           AA|684522|
|           US|649056|
|           UA|611957|
|           NW|473820|
|           CO|373858|
|           TW|179081|
|           HP|173509|
|           MQ|164790|
|           AS|129863|
|           OO|120223|
|           XE| 94311|
|           EV| 67148|
|           OH| 60630|
|           FL| 47540|
|           EA| 43723|
|           PI| 41489|
|           DH| 32900|
|           B6| 29111|
+-------------+------+
only showing top 20 rows

Data cleaning

In [35]:
## Returns a new DataFrame omitting rows with null values
air_without_na = air.na.drop()
air_without_na.show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+

In [36]:
air_without_na.count()
Out[36]:
0
In [37]:
air.count() # original file size
Out[37]:
5548754
In [38]:
## Replace null values
air.na.fill("unknown").show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+-------+-------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay| Origin|   Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+-------+-------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2006|    7|         6|        4| 2055.0|    2055.0| 2150.0|    2148.0|           XE|     2619| N11526|             55.0|          53.0|   25.0|     2.0|     0.0|    IAH|    LCH|   127.0|   8.0|   22.0|        0|         unknown|       0|         0.0|         0.0|     0.0|          0.0|              0.0|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|2007|   10|        26|        5|  654.0|     700.0| 1507.0|    1515.0|           AS|      802| N648AS|            313.0|         315.0|  293.0|    -8.0|    -6.0|    PDX|    BOS|  2537.0|   9.0|   11.0|        0|         unknown|       0|         0.0|         0.0|     0.0|          0.0|              0.0|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|2004|   10|        18|        1|  939.0|     940.0| 1151.0|    1128.0|           NW|      679| N317NB|            192.0|         168.0|  149.0|    23.0|    -1.0|    MSP|    SLC|   991.0|  10.0|   33.0|        0|         unknown|       0|         0.0|         0.0|    23.0|          0.0|              0.0|
|2007|    5|        17|        4| 1944.0|    1830.0| 2034.0|    1930.0|           AA|     1011| N3BDAA|             50.0|          60.0|   35.0|    64.0|    74.0|    MIA|    MCO|   193.0|   4.0|   11.0|        0|         unknown|       0|        64.0|         0.0|     0.0|          0.0|              0.0|
|2003|    7|        31|        4| 1738.0|    1710.0| 1819.0|    1750.0|           DH|     7981| N309UE|             41.0|          40.0|   25.0|    29.0|    28.0|    IAD|    CHO|    77.0|   3.0|   12.0|        0|         unknown|       0|        29.0|         0.0|     0.0|          0.0|              0.0|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|      unknown|  unknown|unknown|             null|          null|   null|    null|    null|unknown|unknown|    null|  null|   null|     null|         unknown|    null|        null|        null|    null|         null|             null|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+-------+-------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
only showing top 20 rows

In [39]:
air.na.replace('NA', "unknown").show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2006|    7|         6|        4| 2055.0|    2055.0| 2150.0|    2148.0|           XE|     2619| N11526|             55.0|          53.0|   25.0|     2.0|     0.0|   IAH| LCH|   127.0|   8.0|   22.0|        0|            null|       0|         0.0|         0.0|     0.0|          0.0|              0.0|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|2007|   10|        26|        5|  654.0|     700.0| 1507.0|    1515.0|           AS|      802| N648AS|            313.0|         315.0|  293.0|    -8.0|    -6.0|   PDX| BOS|  2537.0|   9.0|   11.0|        0|            null|       0|         0.0|         0.0|     0.0|          0.0|              0.0|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|2004|   10|        18|        1|  939.0|     940.0| 1151.0|    1128.0|           NW|      679| N317NB|            192.0|         168.0|  149.0|    23.0|    -1.0|   MSP| SLC|   991.0|  10.0|   33.0|        0|            null|       0|         0.0|         0.0|    23.0|          0.0|              0.0|
|2007|    5|        17|        4| 1944.0|    1830.0| 2034.0|    1930.0|           AA|     1011| N3BDAA|             50.0|          60.0|   35.0|    64.0|    74.0|   MIA| MCO|   193.0|   4.0|   11.0|        0|            null|       0|        64.0|         0.0|     0.0|          0.0|              0.0|
|2003|    7|        31|        4| 1738.0|    1710.0| 1819.0|    1750.0|           DH|     7981| N309UE|             41.0|          40.0|   25.0|    29.0|    28.0|   IAD| CHO|    77.0|   3.0|   12.0|        0|            null|       0|        29.0|         0.0|     0.0|          0.0|              0.0|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
|null| null|      null|     null|   null|      null|   null|      null|         null|     null|   null|             null|          null|   null|    null|    null|  null|null|    null|  null|   null|     null|            null|    null|        null|        null|    null|         null|             null|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
only showing top 20 rows

Statistics

In [52]:
air.corr("Distance","ArrDelay")
Out[52]:
0.008481756987561132
In [53]:
air.cov("Distance","ArrDelay")
Out[53]:
140.57953260215643
In [54]:
air.filter(air.ArrDelay > 60).show() # filter with certain conditions 
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|    5|        17|        4| 1944.0|    1830.0| 2034.0|    1930.0|           AA|     1011| N3BDAA|             50.0|          60.0|   35.0|    64.0|    74.0|   MIA| MCO|   193.0|   4.0|   11.0|        0|            null|       0|        64.0|         0.0|     0.0|          0.0|              0.0|
|2004|    4|        16|        5| 1550.0|    1525.0| 1750.0|    1638.0|           XE|     2602| N15948|            120.0|          73.0|   35.0|    72.0|    25.0|   EWR| BWI|   169.0|   5.0|   80.0|        0|            null|       0|         0.0|         0.0|    51.0|          0.0|             21.0|
|2005|    7|         1|        5| 1931.0|    1723.0| 2235.0|    2017.0|           FL|      831| N978AT|            124.0|         114.0|   86.0|   138.0|   128.0|   MDW| ATL|   590.0|   9.0|   29.0|        0|            null|       0|         0.0|         0.0|    10.0|          0.0|            128.0|
|2005|   11|        22|        2| 1849.0|    1705.0| 2034.0|    1815.0|           B6|       55| N585JB|            105.0|          70.0|   56.0|   139.0|   104.0|   BTV| JFK|   267.0|   4.0|   45.0|        0|            null|       0|         0.0|         0.0|    35.0|          0.0|            104.0|
|2007|   10|        21|        7| 1725.0|    1625.0| 1837.0|    1730.0|           WN|      143| N389SW|             72.0|          65.0|   42.0|    67.0|    60.0|   LAS| LAX|   236.0|  12.0|   18.0|        0|            null|       0|         0.0|         0.0|     7.0|          0.0|             60.0|
|2005|    4|        15|        5| 1519.0|    1310.0| 1751.0|    1545.0|           AS|      631| N947AS|            152.0|         155.0|  136.0|   126.0|   129.0|   LAS| SEA|   866.0|   5.0|   11.0|        0|            null|       0|         4.0|         0.0|     0.0|          0.0|            122.0|
|2006|    1|        24|        2| 1230.0|    1115.0| 1343.0|    1228.0|           OH|     5050| N784CA|             73.0|          73.0|   50.0|    75.0|    75.0|   JFK| BOS|   187.0|   5.0|   18.0|        0|            null|       0|        75.0|         0.0|     0.0|          0.0|              0.0|
|2007|    6|        16|        6| 1525.0|    1155.0| 1636.0|    1255.0|           XE|     2317| N14933|             71.0|          60.0|   44.0|   221.0|   210.0|   LFT| IAH|   201.0|  10.0|   17.0|        0|            null|       0|         0.0|         0.0|   104.0|          0.0|            117.0|
|2006|   12|         1|        5| 1016.0|     720.0| 1226.0|     925.0|           MQ|     3484| N902BC|            190.0|         185.0|  167.0|   181.0|   176.0|   CHS| DFW|   987.0|  11.0|   12.0|        0|            null|       0|       176.0|         0.0|     5.0|          0.0|              0.0|
|2007|    8|        25|        6| 1544.0|    1430.0| 2123.0|    2015.0|           AA|     2073| N612AA|            279.0|         285.0|  256.0|    68.0|    74.0|   ORD| SJU|  2072.0|   4.0|   19.0|        0|            null|       0|        68.0|         0.0|     0.0|          0.0|              0.0|
|2006|    7|        20|        4| 1102.0|     834.0| 1445.0|    1225.0|           AA|     1652| N441AA|            163.0|         171.0|  144.0|   140.0|   148.0|   ORD| RSW|  1120.0|   2.0|   17.0|        0|            null|       0|         0.0|       140.0|     0.0|          0.0|              0.0|
|2006|    7|        23|        7| 2110.0|    1920.0| 2228.0|    2035.0|           WN|     2347|   N433|             78.0|          75.0|   57.0|   113.0|   110.0|   SNA| OAK|   371.0|   6.0|   15.0|        0|            null|       0|         0.0|         0.0|     3.0|          4.0|            106.0|
|2007|    1|        14|        7| 1110.0|     850.0| 1216.0|    1010.0|           9E|     5968| 85889E|             66.0|          80.0|   44.0|   126.0|   140.0|   MEM| STL|   256.0|   5.0|   17.0|        0|            null|       0|       126.0|         0.0|     0.0|          0.0|              0.0|
|2004|    1|        11|        7| 1530.0|    1255.0| 1815.0|    1545.0|           WN|     1405|   N630|            105.0|         110.0|   94.0|   150.0|   155.0|   OAK| PHX|   646.0|   3.0|    8.0|        0|            null|       0|       150.0|         0.0|     0.0|          0.0|              0.0|
|2007|   10|        26|        5| 1603.0|    1448.0| 1900.0|    1752.0|           UA|      270| N535UA|            117.0|         124.0|  105.0|    68.0|    75.0|   SNA| DEN|   846.0|   5.0|    7.0|        0|            null|       0|         0.0|         0.0|     0.0|          0.0|             68.0|
|2005|   10|        31|        1|  854.0|     858.0| 1136.0|    1000.0|           MQ|     3384| N852AE|            162.0|          62.0|   41.0|    96.0|    -4.0|   SJT| DFW|   228.0| 109.0|   12.0|        0|            null|       0|         0.0|         0.0|    96.0|          0.0|              0.0|
|2006|    1|         6|        5| 2200.0|    2030.0| 2320.0|    2214.0|           OH|     5830| N408CA|            140.0|         164.0|  119.0|    66.0|    90.0|   LGA| BHM|   866.0|   5.0|   16.0|        0|            null|       0|         0.0|         0.0|     0.0|          0.0|             66.0|
|2005|    6|         1|        3| 1729.0|    1610.0| 1847.0|    1740.0|           AA|     1966| N4UAAA|             78.0|          90.0|   62.0|    67.0|    79.0|   DFW| MCI|   460.0|   3.0|   13.0|        0|            null|       0|        67.0|         0.0|     0.0|          0.0|              0.0|
|2007|    7|        13|        5| 1840.0|    1745.0| 2007.0|    1905.0|           EV|     4263| N879AS|            147.0|         140.0|  125.0|    62.0|    55.0|   DCA| JAN|   860.0|   6.0|   16.0|        0|            null|       0|        55.0|         0.0|     7.0|          0.0|              0.0|
|2006|    2|        26|        7| 1953.0|    1745.0| 2145.0|    1940.0|           OO|     6765| N750SK|            112.0|         115.0|   92.0|   125.0|   128.0|   DEN| TUS|   639.0|   6.0|   14.0|        0|            null|       0|       125.0|         0.0|     0.0|          0.0|              0.0|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
only showing top 20 rows

User-defined functions

In [55]:
## air2 = air.select(["DayOfWeek","ArrDelay","AirTime","Distance"])
air2_pdf = air.select(["DayOfWeek", "ArrDelay","AirTime","Distance"]).toPandas()
air2_pdf
Out[55]:
DayOfWeek ArrDelay AirTime Distance
0 4.0 2.0 25.0 127.0
1 7.0 29.0 248.0 1623.0
2 NaN NaN NaN NaN
3 5.0 -2.0 70.0 451.0
4 7.0 11.0 133.0 1009.0
... ... ... ... ...
5548749 3.0 13.0 59.0 318.0
5548750 1.0 22.0 34.0 181.0
5548751 1.0 11.0 71.0 551.0
5548752 NaN NaN NaN NaN
5548753 2.0 -14.0 107.0 888.0

5548754 rows × 4 columns

In [57]:
import pandas as pd
def myfun(pdf):
    out = dict() 
    out["ArrDelay"] = pdf.ArrDelay.mean()
    out["AirTime"]  = pdf.AirTime.mean()
    out["Distance"] = pdf.Distance.mean()
    
    return pd.DataFrame(out, index=[0])

myfun(air2_pdf)
Out[57]:
ArrDelay AirTime Distance
0 7.350591 102.688519 729.997977