Big Data Essentials¶

Data Processing with Spark¶





Yanfei Kang
yanfeikang@buaa.edu.cn
School of Economics and Management
Beihang University
http://yanfei.site

Three APIs of Spark¶

  • RDD (2011, spark 1.0)

  • DataFrames (2013, spark 1.3)

  • Datasets (2015, spark 1.6)

Spark SQL, DataFrames and Datasets¶

  • Spark SQL is a Spark module for structured data processing.

  • Unlike RDD, Spark SQL provides Spark with more information about the structure of both the data and the computation being performed.

  • There are several ways to interact with Spark SQL including SQL and the Dataset API.

Spark SQL¶

  • One use of Spark SQL is to execute SQL queries.

  • Spark SQL can also be used to read data from an existing Hive installation.

  • When running SQL from within another programming language the results will be returned as a Dataset/DataFrame.

Spark Datasets¶

  • A Dataset is a distributed collection of data.

  • Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

  • The Dataset API is available in Scala and Java.

  • Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).

Spark DataFrames¶

  • A DataFrame is a Dataset organized into named columns.

  • It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

  • DataFrames can be constructed from a wide array of sources such as:

    • structured data files,
    • tables in Hive,
    • external databases, or
    • existing RDDs.
  • The DataFrame API is available in Scala, Java, Python, and R.

Spark session and spark context¶

What is SparkContext¶

Spark SparkContext is an entry point to Spark and defined in org.apache.spark package since 1.x and used to programmatically create Spark RDD, accumulators and broadcast variables on the cluster. Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object sc is default available in spark-shell and it can be programmatically created using SparkContext class.

What is SparkSession¶

SparkSession introduced in version 2.0 and and is an entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame and DataSet. It’s object spark is default available in spark-shell and it can be created programmatically using SparkSession builder pattern.

Start a Spark session¶

InĀ [1]:
import findspark
findspark.init('/opt/apps/SPARK3/spark-current')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Data processing using pyspark") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "14g") \
    .config("spark.num.executors", "4") \
    .getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/07 17:05:20 WARN [Thread-6] Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

Creating DataFrames¶

Convert an RDD to a DataFrame¶

InĀ [2]:
sc = spark.sparkContext # When you create a SparkSession object, SparkContext is also created and can be retrieved using spark.sparkContext.
InĀ [3]:
# Load a text file and convert each line to a Row.
from pyspark.sql import Row
# Text file RDDs can be created using SparkContext’s textFile method. 
lines = sc.textFile("file:///opt/apps/SPARK3/spark-3.5.3-hadoop3.2-1.0.0/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
people
Out[3]:
PythonRDD[2] at RDD at PythonRDD.scala:53
InĀ [4]:
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.show()
                                                                                
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

InĀ [5]:
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
+------+---+
|  name|age|
+------+---+
|Justin| 19|
+------+---+

Create Spark DataFrame directly from a file¶

InĀ [7]:
sdf = spark.read.csv("file:///opt/apps/SPARK3/spark-3.5.3-hadoop3.2-1.0.0/examples/src/main/resources/people.csv")
sdf.show() # Displays the content of the DataFrame to stdout
                                                                                
+------------------+
|               _c0|
+------------------+
|      name;age;job|
|Jorge;30;Developer|
|  Bob;32;Developer|
+------------------+

InĀ [6]:
sdf2 = spark.read.json("file:///opt/apps/SPARK3/spark-3.5.3-hadoop3.2-1.0.0/examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
sdf2.show()
                                                                                
+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

The CSV file dose not have a header of the data, but we could create a description (schema in Spark) for it .

InĀ [7]:
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.

# Create a schema
schemaString = ["name", "age"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
schema = StructType(fields)
schema
Out[7]:
StructType([StructField('name', StringType(), True), StructField('age', StringType(), True)])
InĀ [8]:
sdf_withschema = spark.createDataFrame(people, schema)
sdf_withschema.show()
[Stage 8:>                                                          (0 + 1) / 1]
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

                                                                                

Read file and infer the schema from the header¶

InĀ [15]:
## Load a hdfs file 
air = spark.read.options(header='true', inferSchema='true').csv("/data/flights.csv") 
air
                                                                                
Out[15]:
DataFrame[FL_DATE: date, AIRLINE: string, AIRLINE_DOT: string, AIRLINE_CODE: string, DOT_CODE: int, FL_NUMBER: int, ORIGIN: string, ORIGIN_CITY: string, DEST: string, DEST_CITY: string, CRS_DEP_TIME: int, DEP_TIME: double, DEP_DELAY: double, TAXI_OUT: double, WHEELS_OFF: double, WHEELS_ON: double, TAXI_IN: double, CRS_ARR_TIME: int, ARR_TIME: double, ARR_DELAY: double, CANCELLED: double, CANCELLATION_CODE: string, DIVERTED: double, CRS_ELAPSED_TIME: double, ELAPSED_TIME: double, AIR_TIME: double, DISTANCE: double, DELAY_DUE_CARRIER: double, DELAY_DUE_WEATHER: double, DELAY_DUE_NAS: double, DELAY_DUE_SECURITY: double, DELAY_DUE_LATE_AIRCRAFT: double]
InĀ [16]:
air.show(5)
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|   FL_DATE|             AIRLINE|         AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN|        ORIGIN_CITY|DEST|           DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|2019-01-09|United Air Lines ...|United Air Lines ...|          UA|   19977|     1562|   FLL|Fort Lauderdale, FL| EWR|          Newark, NJ|        1155|  1151.0|     -4.0|    19.0|    1210.0|   1443.0|    4.0|        1501|  1447.0|    -14.0|      0.0|             NULL|     0.0|           186.0|       176.0|   153.0|  1065.0|             NULL|             NULL|         NULL|              NULL|                   NULL|
|2022-11-19|Delta Air Lines Inc.|Delta Air Lines I...|          DL|   19790|     1149|   MSP|    Minneapolis, MN| SEA|         Seattle, WA|        2120|  2114.0|     -6.0|     9.0|    2123.0|   2232.0|   38.0|        2315|  2310.0|     -5.0|      0.0|             NULL|     0.0|           235.0|       236.0|   189.0|  1399.0|             NULL|             NULL|         NULL|              NULL|                   NULL|
|2022-07-22|United Air Lines ...|United Air Lines ...|          UA|   19977|      459|   DEN|         Denver, CO| MSP|     Minneapolis, MN|         954|  1000.0|      6.0|    20.0|    1020.0|   1247.0|    5.0|        1252|  1252.0|      0.0|      0.0|             NULL|     0.0|           118.0|       112.0|    87.0|   680.0|             NULL|             NULL|         NULL|              NULL|                   NULL|
|2023-03-06|Delta Air Lines Inc.|Delta Air Lines I...|          DL|   19790|     2295|   MSP|    Minneapolis, MN| SFO|   San Francisco, CA|        1609|  1608.0|     -1.0|    27.0|    1635.0|   1844.0|    9.0|        1829|  1853.0|     24.0|      0.0|             NULL|     0.0|           260.0|       285.0|   249.0|  1589.0|              0.0|              0.0|         24.0|               0.0|                    0.0|
|2020-02-23|    Spirit Air Lines|Spirit Air Lines: NK|          NK|   20416|      407|   MCO|        Orlando, FL| DFW|Dallas/Fort Worth...|        1840|  1838.0|     -2.0|    15.0|    1853.0|   2026.0|   14.0|        2041|  2040.0|     -1.0|      0.0|             NULL|     0.0|           181.0|       182.0|   153.0|   985.0|             NULL|             NULL|         NULL|              NULL|                   NULL|
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
only showing top 5 rows

InĀ [17]:
# Get the number of rows and columns
num_rows = air.count()
num_columns = len(air.columns)

# Print the shape
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")
Number of rows: 3000000
Number of columns: 32

The air delay data contains 3000000 rows with some of the following attributes.

Column Name Description
FL_DATE Date of the flight.
AIRLINE Name of the airline.
AIRLINE_DOT DOT identifier for the airline.
AIRLINE_CODE Code assigned to the airline.
DOT_CODE DOT identifier.
FL_NUMBER Flight number.
ORIGIN Origin airport code.
ORIGIN_CITY City of origin airport.
DEST Destination airport code.
DEST_CITY City of destination airport.
CRS_DEP_TIME Scheduled departure time.
DEP_TIME Actual departure time.
DEP_DELAY Departure delay.
TAXI_OUT Time spent taxiing out.
WHEELS_OFF Time when aircraft's wheels leave the ground.
WHEELS_ON Time when aircraft's wheels touch the ground.
TAXI_IN Time spent taxiing in.
CRS_ARR_TIME Scheduled arrival time.
ARR_TIME Actual arrival time.
ARR_DELAY Arrival delay.
CANCELLED Indicator if the flight was cancelled (1 for cancelled, 0 for not cancelled).
CANCELLATION_CODE Reason for cancellation (if applicable).
DIVERTED Indicator if the flight was diverted (1 for diverted, 0 for not diverted).
CRS_ELAPSED_TIME Scheduled elapsed time.
ELAPSED_TIME Actual elapsed time.
AIR_TIME Time spent in the air.
DISTANCE Distance traveled.
DELAY_DUE_CARRIER Delay due to carrier.
DELAY_DUE_WEATHER Delay due to weather.
DELAY_DUE_NAS Delay due to National Airspace System (NAS).
DELAY_DUE_SECURITY Delay due to security.
DELAY_DUE_LATE_AIRCRAFT Delay due to late aircraft arrival.

Descriptive Statistics¶

InĀ [18]:
air.describe().show()
[Stage 21:===========================================>              (6 + 2) / 8]
+-------+--------------------+--------------------+------------+-----------------+------------------+-------+------------+-------+------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-----------------------+
|summary|             AIRLINE|         AIRLINE_DOT|AIRLINE_CODE|         DOT_CODE|         FL_NUMBER| ORIGIN| ORIGIN_CITY|   DEST|   DEST_CITY|      CRS_DEP_TIME|          DEP_TIME|         DEP_DELAY|          TAXI_OUT|        WHEELS_OFF|        WHEELS_ON|          TAXI_IN|      CRS_ARR_TIME|         ARR_TIME|        ARR_DELAY|          CANCELLED|CANCELLATION_CODE|           DIVERTED|  CRS_ELAPSED_TIME|     ELAPSED_TIME|          AIR_TIME|         DISTANCE| DELAY_DUE_CARRIER| DELAY_DUE_WEATHER|     DELAY_DUE_NAS| DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+-------+--------------------+--------------------+------------+-----------------+------------------+-------+------------+-------+------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-----------------------+
|  count|             3000000|             3000000|     3000000|          3000000|           3000000|3000000|     3000000|3000000|     3000000|           3000000|           2922385|           2922356|           2921194|           2921194|          2920056|          2920056|           3000000|          2920058|          2913802|            3000000|            79140|            3000000|           2999986|          2913802|           2913802|          3000000|            533863|            533863|            533863|             533863|                 533863|
|   mean|                NULL|                NULL|        NULL|     19976.294095|2511.5355186666666|   NULL|        NULL|   NULL|        NULL|1327.0619843333334| 1329.775913166814|10.123326179288219|16.643045617648127|1352.3609886916104|1462.499568501426|7.678982183903322|1490.5606646666668|1466.511162449513|4.260858150279257|            0.02638|             NULL|           0.002352|142.27580728710066|136.6205411349158|112.31083958347205|809.3615516666666|24.759086132584578|3.9852602634009098|13.164727654847779|0.14593069757596985|     25.471281958105358|
| stddev|                NULL|                NULL|        NULL|377.2846191135991|1747.2580396344772|   NULL|        NULL|   NULL|        NULL|485.87885383467284|499.31005155502544| 49.25183487489521| 9.192901205809838| 500.8726874878446|527.2368180484285|6.269639312248715| 511.5475663429901|531.8383494685161|51.17482436059588|0.16026260999175043|             NULL|0.04844036414145036| 71.55668973497872|71.67581550996236| 69.75484349772984|587.8939382449504| 71.77184461920068|32.410795770686214|33.161121549090154|   3.58205281604191|        55.766892035227|
|    min|Alaska Airlines Inc.|Alaska Airlines I...|          9E|            19393|                 1|    ABE|Aberdeen, SD|    ABE|Aberdeen, SD|                 1|               1.0|             -90.0|               1.0|               1.0|              1.0|              1.0|                 1|              1.0|            -96.0|                0.0|                A|                0.0|               1.0|             15.0|               8.0|             29.0|               0.0|               0.0|               0.0|                0.0|                    0.0|
|    max|United Air Lines ...|United Air Lines ...|          YX|            20452|              9562|    YUM|    Yuma, AZ|    YUM|    Yuma, AZ|              2359|            2400.0|            2966.0|             184.0|            2400.0|           2400.0|            249.0|              2400|           2400.0|           2934.0|                1.0|                D|                1.0|             705.0|            739.0|             692.0|           5812.0|            2934.0|            1653.0|            1741.0|             1185.0|                 2557.0|
+-------+--------------------+--------------------+------------+-----------------+------------------+-------+------------+-------+------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-----------------------+

                                                                                
InĀ [20]:
air.describe(['ARR_DELAY']).show()
[Stage 24:=============================>                            (4 + 4) / 8]
+-------+-----------------+
|summary|        ARR_DELAY|
+-------+-----------------+
|  count|          2913802|
|   mean|4.260858150279257|
| stddev|51.17482436059588|
|    min|            -96.0|
|    max|           2934.0|
+-------+-----------------+

                                                                                

Print the schema in a tree format¶

InĀ [21]:
air.printSchema()
root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- DELAY_DUE_CARRIER: double (nullable = true)
 |-- DELAY_DUE_WEATHER: double (nullable = true)
 |-- DELAY_DUE_NAS: double (nullable = true)
 |-- DELAY_DUE_SECURITY: double (nullable = true)
 |-- DELAY_DUE_LATE_AIRCRAFT: double (nullable = true)

Select columns¶

InĀ [24]:
air.select(["ARR_DELAY","AIR_TIME","DISTANCE"]).show()
+---------+--------+--------+
|ARR_DELAY|AIR_TIME|DISTANCE|
+---------+--------+--------+
|    -14.0|   153.0|  1065.0|
|     -5.0|   189.0|  1399.0|
|      0.0|    87.0|   680.0|
|     24.0|   249.0|  1589.0|
|     -1.0|   153.0|   985.0|
|    141.0|    36.0|   181.0|
|    -29.0|    58.0|   399.0|
|     23.0|    88.0|   613.0|
|    -11.0|   200.0|  1379.0|
|      1.0|   198.0|  1533.0|
|     60.0|   115.0|   859.0|
|     -1.0|   151.0|  1061.0|
|      1.0|    59.0|   395.0|
|    -32.0|   112.0|   859.0|
|     NULL|    NULL|   308.0|
|      6.0|    53.0|   283.0|
|     -9.0|   197.0|  1599.0|
|    -24.0|    56.0|   436.0|
|    -13.0|   110.0|   862.0|
|     35.0|   176.0|  1050.0|
+---------+--------+--------+
only showing top 20 rows

InĀ [26]:
air.select(air['AIRLINE'], air['ARR_DELAY']>10).show()
+--------------------+----------------+
|             AIRLINE|(ARR_DELAY > 10)|
+--------------------+----------------+
|United Air Lines ...|           false|
|Delta Air Lines Inc.|           false|
|United Air Lines ...|           false|
|Delta Air Lines Inc.|            true|
|    Spirit Air Lines|           false|
|Southwest Airline...|            true|
|American Airlines...|           false|
|    Republic Airline|            true|
|    Spirit Air Lines|           false|
|Alaska Airlines Inc.|           false|
|Delta Air Lines Inc.|            true|
|American Airlines...|           false|
|Southwest Airline...|           false|
|Delta Air Lines Inc.|           false|
|Southwest Airline...|            NULL|
|Southwest Airline...|           false|
|Delta Air Lines Inc.|           false|
|Southwest Airline...|           false|
|    Spirit Air Lines|           false|
|United Air Lines ...|            true|
+--------------------+----------------+
only showing top 20 rows

InĀ [31]:
# group data with respect to some columns 
air.groupBy(["AIRLINE","CANCELLED"]).count().orderBy("AIRLINE").show()
[Stage 36:>                                                         (0 + 8) / 8]
+--------------------+---------+------+
|             AIRLINE|CANCELLED| count|
+--------------------+---------+------+
|Alaska Airlines Inc.|      1.0|  1934|
|Alaska Airlines Inc.|      0.0| 98533|
|       Allegiant Air|      1.0|  2383|
|       Allegiant Air|      0.0| 50355|
|American Airlines...|      1.0| 10907|
|American Airlines...|      0.0|372199|
|Delta Air Lines Inc.|      1.0|  5982|
|Delta Air Lines Inc.|      0.0|389257|
|   Endeavor Air Inc.|      1.0|  2394|
|   Endeavor Air Inc.|      0.0|110069|
|           Envoy Air|      0.0|117623|
|           Envoy Air|      1.0|  3633|
|ExpressJet Airlin...|      0.0| 18020|
|ExpressJet Airlin...|      1.0|  1062|
|Frontier Airlines...|      1.0|  1666|
|Frontier Airlines...|      0.0| 62800|
|Hawaiian Airlines...|      0.0| 31726|
|Hawaiian Airlines...|      1.0|   388|
|         Horizon Air|      1.0|   374|
|         Horizon Air|      0.0| 20260|
+--------------------+---------+------+
only showing top 20 rows

                                                                                
InĀ [32]:
## Group and sort
aircount=air.groupBy("AIRLINE").count()
aircount.sort("count",ascending=False).show()
[Stage 39:>                                                         (0 + 8) / 8]
+--------------------+------+
|             AIRLINE| count|
+--------------------+------+
|Southwest Airline...|576470|
|Delta Air Lines Inc.|395239|
|American Airlines...|383106|
|SkyWest Airlines ...|343737|
|United Air Lines ...|254504|
|    Republic Airline|143107|
|           Envoy Air|121256|
|     JetBlue Airways|112844|
|   Endeavor Air Inc.|112463|
|   PSA Airlines Inc.|107050|
|Alaska Airlines Inc.|100467|
|    Spirit Air Lines| 95711|
|  Mesa Airlines Inc.| 65012|
|Frontier Airlines...| 64466|
|       Allegiant Air| 52738|
|Hawaiian Airlines...| 32114|
|         Horizon Air| 20634|
|ExpressJet Airlin...| 19082|
+--------------------+------+

                                                                                

Data cleaning¶

InĀ [36]:
# Drop Completely Empty Rows
clean_df = air.dropna(how="all")
InĀ [39]:
# Standardize Nulls for Numeric Delay Columns
from pyspark.sql.functions import col
from pyspark.sql.functions import coalesce, lit

delay_cols = [
    "DELAY_DUE_CARRIER", "DELAY_DUE_WEATHER", "DELAY_DUE_NAS",
    "DELAY_DUE_SECURITY", "DELAY_DUE_LATE_AIRCRAFT"
]
for col_name in delay_cols:
    clean_df = clean_df.withColumn(col_name, coalesce(col(col_name), lit(0)))
InĀ [40]:
# Drop Rows Where Critical Fields Are Null
clean_df = clean_df.filter(
    (~col("CANCELLED").isin(1.0)) & 
    (~col("DIVERTED").isin(1.0)) &
    col("DEP_TIME").isNotNull() & 
    col("ARR_TIME").isNotNull() &
    col("AIRLINE").isNotNull()
)
InĀ [41]:
# Fix Data Types (if necessary)
from pyspark.sql.functions import to_timestamp, to_date

clean_df = clean_df.withColumn("FL_DATE", to_date("FL_DATE", "yyyy-MM-dd"))
clean_df = clean_df.withColumn("DEP_TIME", clean_df["DEP_TIME"].cast("double"))
clean_df = clean_df.withColumn("ARR_TIME", clean_df["ARR_TIME"].cast("double"))
InĀ [42]:
# Trim and Clean String Fields
from pyspark.sql.functions import trim

str_cols = ["AIRLINE", "AIRLINE_DOT", "ORIGIN_CITY", "DEST_CITY"]
for c in str_cols:
    clean_df = clean_df.withColumn(c, trim(col(c)))
InĀ [43]:
# Remove Outliers 
clean_df = clean_df.filter((col("DEP_DELAY") < 360) & (col("DEP_DELAY") > -60))
clean_df = clean_df.filter((col("ARR_DELAY") < 360) & (col("ARR_DELAY") > -60))
InĀ [46]:
from pyspark.sql.functions import (
    count, when, isnan
)
clean_df.printSchema()
clean_df.describe().show()
root
 |-- FL_DATE: date (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- AIRLINE_DOT: string (nullable = true)
 |-- AIRLINE_CODE: string (nullable = true)
 |-- DOT_CODE: integer (nullable = true)
 |-- FL_NUMBER: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- DELAY_DUE_CARRIER: double (nullable = false)
 |-- DELAY_DUE_WEATHER: double (nullable = false)
 |-- DELAY_DUE_NAS: double (nullable = false)
 |-- DELAY_DUE_SECURITY: double (nullable = false)
 |-- DELAY_DUE_LATE_AIRCRAFT: double (nullable = false)

[Stage 57:==================================================>       (7 + 1) / 8]
+-------+--------------------+--------------------+------------+------------------+------------------+-------+------------+-------+------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------+-----------------+--------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-----------------------+
|summary|             AIRLINE|         AIRLINE_DOT|AIRLINE_CODE|          DOT_CODE|         FL_NUMBER| ORIGIN| ORIGIN_CITY|   DEST|   DEST_CITY|      CRS_DEP_TIME|          DEP_TIME|        DEP_DELAY|         TAXI_OUT|        WHEELS_OFF|         WHEELS_ON|          TAXI_IN|      CRS_ARR_TIME|         ARR_TIME|         ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|  CRS_ELAPSED_TIME|      ELAPSED_TIME|          AIR_TIME|         DISTANCE| DELAY_DUE_CARRIER| DELAY_DUE_WEATHER|     DELAY_DUE_NAS|  DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+-------+--------------------+--------------------+------------+------------------+------------------+-------+------------+-------+------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------+-----------------+--------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-----------------------+
|  count|             2905663|             2905663|     2905663|           2905663|           2905663|2905663|     2905663|2905663|     2905663|           2905663|           2905663|          2905663|          2905663|           2905663|           2905663|          2905663|           2905663|          2905663|           2905663|  2905663|                0| 2905663|           2905663|           2905663|           2905663|          2905663|           2905663|           2905663|           2905663|             2905663|                2905663|
|   mean|                NULL|                NULL|        NULL| 19976.10706265661| 2507.923478049588|   NULL|        NULL|   NULL|        NULL|1325.9025055555308|1329.5697333104356|8.423415929514194|16.62807937465563|  1352.25723526782|1462.7354400699599|7.668841500201503|1489.7350941936488|1466.788123743187|2.6379996579093996|      0.0|             NULL|     0.0|142.35123618946864|136.56574626857966|112.26883090021107| 810.334542237004| 3.633803025333633|0.5521352613844069|2.2979612570349692|0.025852275367102103|      4.247468477934296|
| stddev|                NULL|                NULL|        NULL|376.70173553673527|1745.8278139650042|   NULL|        NULL|   NULL|        NULL| 485.3981017131451|499.03614662598665|33.91532378098644|9.162519969816012|500.58987842329657|  526.448955672582|6.232338716316493|510.84734516995115|531.0242626457665| 36.79725081234713|      0.0|             NULL|     0.0| 71.65052556597922| 71.62860300665999| 69.70830249515912|588.8751508539929|18.274506409642203| 8.008969602669756|12.320970492500773|  1.3124513878142212|     20.270061385564016|
|    min|Alaska Airlines Inc.|Alaska Airlines I...|          9E|             19393|                 1|    ABE|Aberdeen, SD|    ABE|Aberdeen, SD|                 1|               1.0|            -57.0|              1.0|               1.0|               1.0|              1.0|                 1|              1.0|             -59.0|      0.0|             NULL|     0.0|              18.0|              15.0|               8.0|             29.0|               0.0|               0.0|               0.0|                 0.0|                    0.0|
|    max|United Air Lines ...|United Air Lines ...|          YX|             20452|              9562|    YUM|    Yuma, AZ|    YUM|    Yuma, AZ|              2359|            2400.0|            359.0|            184.0|            2400.0|            2400.0|            249.0|              2400|           2400.0|             359.0|      0.0|             NULL|     0.0|             705.0|             739.0|             692.0|           5812.0|             358.0|             359.0|             359.0|               301.0|                  357.0|
+-------+--------------------+--------------------+------------+------------------+------------------+-------+------------+-------+------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------+-----------------+--------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-----------------------+

                                                                                
InĀ [47]:
# Null Count per Column
from pyspark.sql.functions import col, sum as spark_sum, when, isnull

null_counts = clean_df.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in clean_df.columns
])
null_counts.show()
[Stage 60:>                                                         (0 + 8) / 8]
+-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|FL_DATE|AIRLINE|AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN|ORIGIN_CITY|DEST|DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT|
+-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
|      0|      0|          0|           0|       0|        0|     0|          0|   0|        0|           0|       0|        0|       0|         0|        0|      0|           0|       0|        0|        0|          2905663|       0|               0|           0|       0|       0|                0|                0|            0|                 0|                      0|
+-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+

                                                                                
InĀ [48]:
clean_df = clean_df.drop("CANCELLATION_CODE")

Statistics¶

InĀ [50]:
from pyspark.sql.functions import avg
delay_stats = clean_df.groupBy("AIRLINE").agg(
    avg("DEP_DELAY").alias("avg_dep_delay"),
    avg("ARR_DELAY").alias("avg_arr_delay"),
    count("*").alias("num_flights")
)
delay_stats.orderBy("avg_arr_delay", ascending=False).show()
[Stage 63:=============================>                            (4 + 4) / 8]
+--------------------+------------------+--------------------+-----------+
|             AIRLINE|     avg_dep_delay|       avg_arr_delay|num_flights|
+--------------------+------------------+--------------------+-----------+
|       Allegiant Air|10.663716814159292|  10.110659512273255|      49946|
|     JetBlue Airways|15.913813058015212|  10.104695594665099|     108868|
|Frontier Airlines...|13.878156909189846|   9.026391600781025|      62482|
|    Spirit Air Lines|11.514032162641854|   6.589060399074921|      92965|
|ExpressJet Airlin...| 8.699753529016357|   6.030752856822765|      17852|
|  Mesa Airlines Inc.| 9.437540182589688|  4.6006332776134755|      62216|
|American Airlines...| 9.577982371391535|   3.714215738011924|     369513|
|United Air Lines ...| 9.460429503626337|  3.3625972241863473|     247495|
|Southwest Airline...|10.668871156988505|   3.150767342590393|     555684|
|Hawaiian Airlines...| 4.137945201150333|  2.9878330120405776|      31643|
|   PSA Airlines Inc.| 6.624263908259724|   2.662366341236634|     103248|
|         Horizon Air| 4.671790816074341|  2.2538678266027383|      20231|
|           Envoy Air|5.5334585824081985|   2.175405636208369|     117100|
|SkyWest Airlines ...| 6.491163047324325|   1.107962553150657|     333486|
|Alaska Airlines Inc.| 4.295771060647738|  0.8727141212547246|      98157|
|    Republic Airline| 4.606081262373186|-0.48297667133191685|     137899|
|Delta Air Lines Inc.| 6.335690806885991| -0.5869592722559188|     387279|
|   Endeavor Air Inc.| 4.459666602797471|  -2.708674349218515|     109599|
+--------------------+------------------+--------------------+-----------+

                                                                                
InĀ [51]:
# Top 10 Most Frequent Routes
from pyspark.sql.functions import concat_ws

route_stats = clean_df.withColumn("ROUTE", concat_ws(" → ", col("ORIGIN"), col("DEST"))) \
                      .groupBy("ROUTE").count() \
                      .orderBy("count", ascending=False)

route_stats.show(10, truncate=False)
[Stage 66:>                                                         (0 + 8) / 8]
+---------+-----+
|ROUTE    |count|
+---------+-----+
|SFO → LAX|5213 |
|LAX → SFO|5066 |
|OGG → HNL|4587 |
|LAX → LAS|4536 |
|HNL → OGG|4488 |
|LGA → ORD|4485 |
|LAS → LAX|4441 |
|ORD → LGA|4384 |
|LAX → JFK|4293 |
|JFK → LAX|4184 |
+---------+-----+
only showing top 10 rows

                                                                                
InĀ [52]:
# Average Air Time and Distance per Route
route_airtime = clean_df.withColumn("ROUTE", concat_ws(" → ", col("ORIGIN"), col("DEST"))) \
                        .groupBy("ROUTE") \
                        .agg(avg("AIR_TIME").alias("avg_air_time"),
                             avg("DISTANCE").alias("avg_distance"),
                             count("*").alias("num_flights"))

route_airtime.orderBy("num_flights", ascending=False).show(10, truncate=False)
[Stage 69:==================================================>       (7 + 1) / 8]
+---------+------------------+------------+-----------+
|ROUTE    |avg_air_time      |avg_distance|num_flights|
+---------+------------------+------------+-----------+
|SFO → LAX|55.92998273546902 |337.0       |5213       |
|LAX → SFO|55.241808132649034|337.0       |5066       |
|OGG → HNL|24.111837802485283|100.0       |4587       |
|LAX → LAS|44.23258377425044 |236.0       |4536       |
|HNL → OGG|22.017602495543674|100.0       |4488       |
|LGA → ORD|117.39397993311037|733.0       |4485       |
|LAS → LAX|43.063949560909705|236.0       |4441       |
|ORD → LGA|97.95095802919708 |733.0       |4384       |
|LAX → JFK|289.08665269042626|2475.0      |4293       |
|JFK → LAX|330.61639579349907|2475.0      |4184       |
+---------+------------------+------------+-----------+
only showing top 10 rows

                                                                                
InĀ [53]:
# Delay Patterns by Cause
from pyspark.sql.functions import mean

delay_cause_cols = [
    "DELAY_DUE_CARRIER", "DELAY_DUE_WEATHER", "DELAY_DUE_NAS",
    "DELAY_DUE_SECURITY", "DELAY_DUE_LATE_AIRCRAFT"
]

clean_df.select([
    mean(col).alias(f"avg_{col.lower()}") for col in delay_cause_cols
]).show()
[Stage 72:>                                                         (0 + 8) / 8]
+---------------------+---------------------+------------------+----------------------+---------------------------+
|avg_delay_due_carrier|avg_delay_due_weather| avg_delay_due_nas|avg_delay_due_security|avg_delay_due_late_aircraft|
+---------------------+---------------------+------------------+----------------------+---------------------------+
|    3.633803025333633|   0.5521352613844069|2.2979612570349692|  0.025852275367102103|          4.247468477934296|
+---------------------+---------------------+------------------+----------------------+---------------------------+

                                                                                
InĀ [56]:
weather_delay_by_airport = clean_df.groupBy("ORIGIN").agg(
    avg("DELAY_DUE_WEATHER").alias("avg_weather_delay"),
    count("*").alias("num_flights")
).orderBy("avg_weather_delay", ascending=False)

weather_delay_by_airport.show(10)
[Stage 78:====================================>                     (5 + 3) / 8]
+------+------------------+-----------+
|ORIGIN| avg_weather_delay|num_flights|
+------+------------------+-----------+
|   GGG|               5.0|        381|
|   EKO| 4.799163179916318|        239|
|   PIH| 4.619047619047619|        357|
|   PGV| 4.607142857142857|         28|
|   DIK| 4.285714285714286|         84|
|   APN| 3.985663082437276|        279|
|   MQT|3.8851224105461393|        531|
|   WYS|3.7535211267605635|        142|
|   DRT|3.6483516483516483|        273|
|   COD|3.3487394957983194|        238|
+------+------------------+-----------+
only showing top 10 rows

                                                                                
InĀ [57]:
# Delay trends over time
from pyspark.sql.functions import year, month

delay_by_month = clean_df.withColumn("YEAR", year("FL_DATE")) \
                         .withColumn("MONTH", month("FL_DATE")) \
                         .groupBy("YEAR", "MONTH") \
                         .agg(avg("ARR_DELAY").alias("avg_arr_delay"),
                              avg("DEP_DELAY").alias("avg_dep_delay"))

delay_by_month.orderBy("YEAR", "MONTH").show()
[Stage 81:==============>                                           (2 + 6) / 8]
+----+-----+-------------------+-------------------+
|YEAR|MONTH|      avg_arr_delay|      avg_dep_delay|
+----+-----+-------------------+-------------------+
|2019|    1| 2.4820393167453116|  7.861189949678745|
|2019|    2|  6.681138208291357| 11.158052804653197|
|2019|    3| 1.8050101878030673|  7.680466269684573|
|2019|    4|  3.282562537323336|  9.015095215977706|
|2019|    5|  5.356540984902379| 10.235554501620426|
|2019|    6|  9.745206261597755| 13.864427209719116|
|2019|    7|  6.404983327724922| 11.737541680687693|
|2019|    8|  6.003047486202923| 11.021544666141065|
|2019|    9|-0.9359927619674289|  5.256407303832868|
|2019|   10| 1.3794001184797182|   6.78538022635862|
|2019|   11| -1.289566044543576|  5.186718929379736|
|2019|   12|  4.234243263020585| 10.340205529572716|
|2020|    1|-2.8118771375738434|  4.951971068091443|
|2020|    2|-1.5591433278418452|  5.910916500476892|
|2020|    3| -7.170927418621154| 0.5467348386979385|
|2020|    4| -14.11148015214014| -3.760111426581668|
|2020|    5|-11.689005416618647|-2.3127809150628096|
|2020|    6| -8.052496632045543| -1.025683368823606|
|2020|    7| -5.925855353490066|-0.2928237173200325|
|2020|    8| -6.553676180455083|-0.6005787189267394|
+----+-----+-------------------+-------------------+
only showing top 20 rows

                                                                                
InĀ [58]:
# STEP 1: Calculate monthly delay stats in Spark
from pyspark.sql.functions import year, month, avg

delay_by_month = clean_df.withColumn("YEAR", year("FL_DATE")) \
                         .withColumn("MONTH", month("FL_DATE")) \
                         .groupBy("YEAR", "MONTH") \
                         .agg(
                             avg("ARR_DELAY").alias("avg_arr_delay"),
                             avg("DEP_DELAY").alias("avg_dep_delay")
                         ) \
                         .orderBy("YEAR", "MONTH")

# STEP 2: Convert to Pandas (collect to driver)
delay_df = delay_by_month.toPandas()

# STEP 3: Create a proper datetime column
import pandas as pd

delay_df["DATE"] = pd.to_datetime(
    delay_df[["YEAR", "MONTH"]].assign(DAY=1)
)

# STEP 4: Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.plot(delay_df["DATE"], delay_df["avg_arr_delay"], marker='o', label='Avg Arrival Delay')
plt.plot(delay_df["DATE"], delay_df["avg_dep_delay"], marker='o', label='Avg Departure Delay')
plt.title("Monthly Average Flight Delays Over Time")
plt.xlabel("Date")
plt.ylabel("Delay (minutes)")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.xticks(rotation=45)
plt.show()
                                                                                
No description has been provided for this image
InĀ [59]:
air.corr("DISTANCE","ARR_Delay")
                                                                                
Out[59]:
0.002050595449321259