Three APIs of Spark¶
RDD (2011, spark 1.0)
DataFrames (2013, spark 1.3)
Datasets (2015, spark 1.6)
Spark SQL, DataFrames and Datasets¶
Spark SQL is a Spark module for structured data processing.
Unlike RDD, Spark SQL provides Spark with more information about the structure of both the data and the computation being performed.
There are several ways to interact with Spark SQL including SQL and the Dataset API.
Spark SQL¶
One use of Spark SQL is to execute SQL queries.
Spark SQL can also be used to read data from an existing Hive installation.
When running SQL from within another programming language the results will be returned as a Dataset/DataFrame.
Spark Datasets¶
A Dataset is a distributed collection of data.
Dataset can be constructed from JVM objects and then manipulated using functional transformations (
map, flatMap, filter
, etc.).The Dataset API is available in Scala and Java.
Python does not have the support for the Dataset API. But due to Pythonās dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).
Spark DataFrames¶
A DataFrame is a Dataset organized into named columns.
It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
DataFrames can be constructed from a wide array of sources such as:
- structured data files,
- tables in Hive,
- external databases, or
- existing RDDs.
The DataFrame API is available in Scala, Java, Python, and R.
Spark session and spark context¶
What is SparkContext¶
Spark SparkContext is an entry point to Spark and defined in org.apache.spark package since 1.x and used to programmatically create Spark RDD, accumulators and broadcast variables on the cluster. Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object sc is default available in spark-shell and it can be programmatically created using SparkContext class.
What is SparkSession¶
SparkSession introduced in version 2.0 and and is an entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame and DataSet. Itās object spark is default available in spark-shell and it can be created programmatically using SparkSession builder pattern.
Start a Spark session¶
import findspark
findspark.init('/opt/apps/SPARK3/spark-current')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Data processing using pyspark") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memory", "14g") \
.config("spark.num.executors", "4") \
.getOrCreate()
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/04/07 17:05:20 WARN [Thread-6] Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Creating DataFrames¶
Convert an RDD to a DataFrame¶
sc = spark.sparkContext # When you create a SparkSession object, SparkContext is also created and can be retrieved using spark.sparkContext.
# Load a text file and convert each line to a Row.
from pyspark.sql import Row
# Text file RDDs can be created using SparkContextās textFile method.
lines = sc.textFile("file:///opt/apps/SPARK3/spark-3.5.3-hadoop3.2-1.0.0/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
people
PythonRDD[2] at RDD at PythonRDD.scala:53
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.show()
+-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
+------+---+ | name|age| +------+---+ |Justin| 19| +------+---+
Create Spark DataFrame directly from a file¶
sdf = spark.read.csv("file:///opt/apps/SPARK3/spark-3.5.3-hadoop3.2-1.0.0/examples/src/main/resources/people.csv")
sdf.show() # Displays the content of the DataFrame to stdout
+------------------+ | _c0| +------------------+ | name;age;job| |Jorge;30;Developer| | Bob;32;Developer| +------------------+
sdf2 = spark.read.json("file:///opt/apps/SPARK3/spark-3.5.3-hadoop3.2-1.0.0/examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
sdf2.show()
+----+-------+ | age| name| +----+-------+ |NULL|Michael| | 30| Andy| | 19| Justin| +----+-------+
The CSV file dose not have a header of the data, but we could create a description (schema in Spark) for it .
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.
# Create a schema
schemaString = ["name", "age"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
schema = StructType(fields)
schema
StructType([StructField('name', StringType(), True), StructField('age', StringType(), True)])
sdf_withschema = spark.createDataFrame(people, schema)
sdf_withschema.show()
[Stage 8:> (0 + 1) / 1]
+-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
Read file and infer the schema from the header¶
## Load a hdfs file
air = spark.read.options(header='true', inferSchema='true').csv("/data/flights.csv")
air
DataFrame[FL_DATE: date, AIRLINE: string, AIRLINE_DOT: string, AIRLINE_CODE: string, DOT_CODE: int, FL_NUMBER: int, ORIGIN: string, ORIGIN_CITY: string, DEST: string, DEST_CITY: string, CRS_DEP_TIME: int, DEP_TIME: double, DEP_DELAY: double, TAXI_OUT: double, WHEELS_OFF: double, WHEELS_ON: double, TAXI_IN: double, CRS_ARR_TIME: int, ARR_TIME: double, ARR_DELAY: double, CANCELLED: double, CANCELLATION_CODE: string, DIVERTED: double, CRS_ELAPSED_TIME: double, ELAPSED_TIME: double, AIR_TIME: double, DISTANCE: double, DELAY_DUE_CARRIER: double, DELAY_DUE_WEATHER: double, DELAY_DUE_NAS: double, DELAY_DUE_SECURITY: double, DELAY_DUE_LATE_AIRCRAFT: double]
air.show(5)
+----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+ | FL_DATE| AIRLINE| AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN| ORIGIN_CITY|DEST| DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT| +----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+ |2019-01-09|United Air Lines ...|United Air Lines ...| UA| 19977| 1562| FLL|Fort Lauderdale, FL| EWR| Newark, NJ| 1155| 1151.0| -4.0| 19.0| 1210.0| 1443.0| 4.0| 1501| 1447.0| -14.0| 0.0| NULL| 0.0| 186.0| 176.0| 153.0| 1065.0| NULL| NULL| NULL| NULL| NULL| |2022-11-19|Delta Air Lines Inc.|Delta Air Lines I...| DL| 19790| 1149| MSP| Minneapolis, MN| SEA| Seattle, WA| 2120| 2114.0| -6.0| 9.0| 2123.0| 2232.0| 38.0| 2315| 2310.0| -5.0| 0.0| NULL| 0.0| 235.0| 236.0| 189.0| 1399.0| NULL| NULL| NULL| NULL| NULL| |2022-07-22|United Air Lines ...|United Air Lines ...| UA| 19977| 459| DEN| Denver, CO| MSP| Minneapolis, MN| 954| 1000.0| 6.0| 20.0| 1020.0| 1247.0| 5.0| 1252| 1252.0| 0.0| 0.0| NULL| 0.0| 118.0| 112.0| 87.0| 680.0| NULL| NULL| NULL| NULL| NULL| |2023-03-06|Delta Air Lines Inc.|Delta Air Lines I...| DL| 19790| 2295| MSP| Minneapolis, MN| SFO| San Francisco, CA| 1609| 1608.0| -1.0| 27.0| 1635.0| 1844.0| 9.0| 1829| 1853.0| 24.0| 0.0| NULL| 0.0| 260.0| 285.0| 249.0| 1589.0| 0.0| 0.0| 24.0| 0.0| 0.0| |2020-02-23| Spirit Air Lines|Spirit Air Lines: NK| NK| 20416| 407| MCO| Orlando, FL| DFW|Dallas/Fort Worth...| 1840| 1838.0| -2.0| 15.0| 1853.0| 2026.0| 14.0| 2041| 2040.0| -1.0| 0.0| NULL| 0.0| 181.0| 182.0| 153.0| 985.0| NULL| NULL| NULL| NULL| NULL| +----------+--------------------+--------------------+------------+--------+---------+------+-------------------+----+--------------------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+ only showing top 5 rows
# Get the number of rows and columns
num_rows = air.count()
num_columns = len(air.columns)
# Print the shape
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")
Number of rows: 3000000 Number of columns: 32
The air delay data contains 3000000 rows with some of the following attributes.
Column Name | Description |
---|---|
FL_DATE | Date of the flight. |
AIRLINE | Name of the airline. |
AIRLINE_DOT | DOT identifier for the airline. |
AIRLINE_CODE | Code assigned to the airline. |
DOT_CODE | DOT identifier. |
FL_NUMBER | Flight number. |
ORIGIN | Origin airport code. |
ORIGIN_CITY | City of origin airport. |
DEST | Destination airport code. |
DEST_CITY | City of destination airport. |
CRS_DEP_TIME | Scheduled departure time. |
DEP_TIME | Actual departure time. |
DEP_DELAY | Departure delay. |
TAXI_OUT | Time spent taxiing out. |
WHEELS_OFF | Time when aircraft's wheels leave the ground. |
WHEELS_ON | Time when aircraft's wheels touch the ground. |
TAXI_IN | Time spent taxiing in. |
CRS_ARR_TIME | Scheduled arrival time. |
ARR_TIME | Actual arrival time. |
ARR_DELAY | Arrival delay. |
CANCELLED | Indicator if the flight was cancelled (1 for cancelled, 0 for not cancelled). |
CANCELLATION_CODE | Reason for cancellation (if applicable). |
DIVERTED | Indicator if the flight was diverted (1 for diverted, 0 for not diverted). |
CRS_ELAPSED_TIME | Scheduled elapsed time. |
ELAPSED_TIME | Actual elapsed time. |
AIR_TIME | Time spent in the air. |
DISTANCE | Distance traveled. |
DELAY_DUE_CARRIER | Delay due to carrier. |
DELAY_DUE_WEATHER | Delay due to weather. |
DELAY_DUE_NAS | Delay due to National Airspace System (NAS). |
DELAY_DUE_SECURITY | Delay due to security. |
DELAY_DUE_LATE_AIRCRAFT | Delay due to late aircraft arrival. |
Descriptive Statistics¶
air.describe().show()
[Stage 21:===========================================> (6 + 2) / 8]
+-------+--------------------+--------------------+------------+-----------------+------------------+-------+------------+-------+------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-----------------------+ |summary| AIRLINE| AIRLINE_DOT|AIRLINE_CODE| DOT_CODE| FL_NUMBER| ORIGIN| ORIGIN_CITY| DEST| DEST_CITY| CRS_DEP_TIME| DEP_TIME| DEP_DELAY| TAXI_OUT| WHEELS_OFF| WHEELS_ON| TAXI_IN| CRS_ARR_TIME| ARR_TIME| ARR_DELAY| CANCELLED|CANCELLATION_CODE| DIVERTED| CRS_ELAPSED_TIME| ELAPSED_TIME| AIR_TIME| DISTANCE| DELAY_DUE_CARRIER| DELAY_DUE_WEATHER| DELAY_DUE_NAS| DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT| +-------+--------------------+--------------------+------------+-----------------+------------------+-------+------------+-------+------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-----------------------+ | count| 3000000| 3000000| 3000000| 3000000| 3000000|3000000| 3000000|3000000| 3000000| 3000000| 2922385| 2922356| 2921194| 2921194| 2920056| 2920056| 3000000| 2920058| 2913802| 3000000| 79140| 3000000| 2999986| 2913802| 2913802| 3000000| 533863| 533863| 533863| 533863| 533863| | mean| NULL| NULL| NULL| 19976.294095|2511.5355186666666| NULL| NULL| NULL| NULL|1327.0619843333334| 1329.775913166814|10.123326179288219|16.643045617648127|1352.3609886916104|1462.499568501426|7.678982183903322|1490.5606646666668|1466.511162449513|4.260858150279257| 0.02638| NULL| 0.002352|142.27580728710066|136.6205411349158|112.31083958347205|809.3615516666666|24.759086132584578|3.9852602634009098|13.164727654847779|0.14593069757596985| 25.471281958105358| | stddev| NULL| NULL| NULL|377.2846191135991|1747.2580396344772| NULL| NULL| NULL| NULL|485.87885383467284|499.31005155502544| 49.25183487489521| 9.192901205809838| 500.8726874878446|527.2368180484285|6.269639312248715| 511.5475663429901|531.8383494685161|51.17482436059588|0.16026260999175043| NULL|0.04844036414145036| 71.55668973497872|71.67581550996236| 69.75484349772984|587.8939382449504| 71.77184461920068|32.410795770686214|33.161121549090154| 3.58205281604191| 55.766892035227| | min|Alaska Airlines Inc.|Alaska Airlines I...| 9E| 19393| 1| ABE|Aberdeen, SD| ABE|Aberdeen, SD| 1| 1.0| -90.0| 1.0| 1.0| 1.0| 1.0| 1| 1.0| -96.0| 0.0| A| 0.0| 1.0| 15.0| 8.0| 29.0| 0.0| 0.0| 0.0| 0.0| 0.0| | max|United Air Lines ...|United Air Lines ...| YX| 20452| 9562| YUM| Yuma, AZ| YUM| Yuma, AZ| 2359| 2400.0| 2966.0| 184.0| 2400.0| 2400.0| 249.0| 2400| 2400.0| 2934.0| 1.0| D| 1.0| 705.0| 739.0| 692.0| 5812.0| 2934.0| 1653.0| 1741.0| 1185.0| 2557.0| +-------+--------------------+--------------------+------------+-----------------+------------------+-------+------------+-------+------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-----------------------+
air.describe(['ARR_DELAY']).show()
[Stage 24:=============================> (4 + 4) / 8]
+-------+-----------------+ |summary| ARR_DELAY| +-------+-----------------+ | count| 2913802| | mean|4.260858150279257| | stddev|51.17482436059588| | min| -96.0| | max| 2934.0| +-------+-----------------+
Print the schema in a tree format¶
air.printSchema()
root |-- FL_DATE: date (nullable = true) |-- AIRLINE: string (nullable = true) |-- AIRLINE_DOT: string (nullable = true) |-- AIRLINE_CODE: string (nullable = true) |-- DOT_CODE: integer (nullable = true) |-- FL_NUMBER: integer (nullable = true) |-- ORIGIN: string (nullable = true) |-- ORIGIN_CITY: string (nullable = true) |-- DEST: string (nullable = true) |-- DEST_CITY: string (nullable = true) |-- CRS_DEP_TIME: integer (nullable = true) |-- DEP_TIME: double (nullable = true) |-- DEP_DELAY: double (nullable = true) |-- TAXI_OUT: double (nullable = true) |-- WHEELS_OFF: double (nullable = true) |-- WHEELS_ON: double (nullable = true) |-- TAXI_IN: double (nullable = true) |-- CRS_ARR_TIME: integer (nullable = true) |-- ARR_TIME: double (nullable = true) |-- ARR_DELAY: double (nullable = true) |-- CANCELLED: double (nullable = true) |-- CANCELLATION_CODE: string (nullable = true) |-- DIVERTED: double (nullable = true) |-- CRS_ELAPSED_TIME: double (nullable = true) |-- ELAPSED_TIME: double (nullable = true) |-- AIR_TIME: double (nullable = true) |-- DISTANCE: double (nullable = true) |-- DELAY_DUE_CARRIER: double (nullable = true) |-- DELAY_DUE_WEATHER: double (nullable = true) |-- DELAY_DUE_NAS: double (nullable = true) |-- DELAY_DUE_SECURITY: double (nullable = true) |-- DELAY_DUE_LATE_AIRCRAFT: double (nullable = true)
Select columns¶
air.select(["ARR_DELAY","AIR_TIME","DISTANCE"]).show()
+---------+--------+--------+ |ARR_DELAY|AIR_TIME|DISTANCE| +---------+--------+--------+ | -14.0| 153.0| 1065.0| | -5.0| 189.0| 1399.0| | 0.0| 87.0| 680.0| | 24.0| 249.0| 1589.0| | -1.0| 153.0| 985.0| | 141.0| 36.0| 181.0| | -29.0| 58.0| 399.0| | 23.0| 88.0| 613.0| | -11.0| 200.0| 1379.0| | 1.0| 198.0| 1533.0| | 60.0| 115.0| 859.0| | -1.0| 151.0| 1061.0| | 1.0| 59.0| 395.0| | -32.0| 112.0| 859.0| | NULL| NULL| 308.0| | 6.0| 53.0| 283.0| | -9.0| 197.0| 1599.0| | -24.0| 56.0| 436.0| | -13.0| 110.0| 862.0| | 35.0| 176.0| 1050.0| +---------+--------+--------+ only showing top 20 rows
air.select(air['AIRLINE'], air['ARR_DELAY']>10).show()
+--------------------+----------------+ | AIRLINE|(ARR_DELAY > 10)| +--------------------+----------------+ |United Air Lines ...| false| |Delta Air Lines Inc.| false| |United Air Lines ...| false| |Delta Air Lines Inc.| true| | Spirit Air Lines| false| |Southwest Airline...| true| |American Airlines...| false| | Republic Airline| true| | Spirit Air Lines| false| |Alaska Airlines Inc.| false| |Delta Air Lines Inc.| true| |American Airlines...| false| |Southwest Airline...| false| |Delta Air Lines Inc.| false| |Southwest Airline...| NULL| |Southwest Airline...| false| |Delta Air Lines Inc.| false| |Southwest Airline...| false| | Spirit Air Lines| false| |United Air Lines ...| true| +--------------------+----------------+ only showing top 20 rows
# group data with respect to some columns
air.groupBy(["AIRLINE","CANCELLED"]).count().orderBy("AIRLINE").show()
[Stage 36:> (0 + 8) / 8]
+--------------------+---------+------+ | AIRLINE|CANCELLED| count| +--------------------+---------+------+ |Alaska Airlines Inc.| 1.0| 1934| |Alaska Airlines Inc.| 0.0| 98533| | Allegiant Air| 1.0| 2383| | Allegiant Air| 0.0| 50355| |American Airlines...| 1.0| 10907| |American Airlines...| 0.0|372199| |Delta Air Lines Inc.| 1.0| 5982| |Delta Air Lines Inc.| 0.0|389257| | Endeavor Air Inc.| 1.0| 2394| | Endeavor Air Inc.| 0.0|110069| | Envoy Air| 0.0|117623| | Envoy Air| 1.0| 3633| |ExpressJet Airlin...| 0.0| 18020| |ExpressJet Airlin...| 1.0| 1062| |Frontier Airlines...| 1.0| 1666| |Frontier Airlines...| 0.0| 62800| |Hawaiian Airlines...| 0.0| 31726| |Hawaiian Airlines...| 1.0| 388| | Horizon Air| 1.0| 374| | Horizon Air| 0.0| 20260| +--------------------+---------+------+ only showing top 20 rows
## Group and sort
aircount=air.groupBy("AIRLINE").count()
aircount.sort("count",ascending=False).show()
[Stage 39:> (0 + 8) / 8]
+--------------------+------+ | AIRLINE| count| +--------------------+------+ |Southwest Airline...|576470| |Delta Air Lines Inc.|395239| |American Airlines...|383106| |SkyWest Airlines ...|343737| |United Air Lines ...|254504| | Republic Airline|143107| | Envoy Air|121256| | JetBlue Airways|112844| | Endeavor Air Inc.|112463| | PSA Airlines Inc.|107050| |Alaska Airlines Inc.|100467| | Spirit Air Lines| 95711| | Mesa Airlines Inc.| 65012| |Frontier Airlines...| 64466| | Allegiant Air| 52738| |Hawaiian Airlines...| 32114| | Horizon Air| 20634| |ExpressJet Airlin...| 19082| +--------------------+------+
Data cleaning¶
# Drop Completely Empty Rows
clean_df = air.dropna(how="all")
# Standardize Nulls for Numeric Delay Columns
from pyspark.sql.functions import col
from pyspark.sql.functions import coalesce, lit
delay_cols = [
"DELAY_DUE_CARRIER", "DELAY_DUE_WEATHER", "DELAY_DUE_NAS",
"DELAY_DUE_SECURITY", "DELAY_DUE_LATE_AIRCRAFT"
]
for col_name in delay_cols:
clean_df = clean_df.withColumn(col_name, coalesce(col(col_name), lit(0)))
# Drop Rows Where Critical Fields Are Null
clean_df = clean_df.filter(
(~col("CANCELLED").isin(1.0)) &
(~col("DIVERTED").isin(1.0)) &
col("DEP_TIME").isNotNull() &
col("ARR_TIME").isNotNull() &
col("AIRLINE").isNotNull()
)
# Fix Data Types (if necessary)
from pyspark.sql.functions import to_timestamp, to_date
clean_df = clean_df.withColumn("FL_DATE", to_date("FL_DATE", "yyyy-MM-dd"))
clean_df = clean_df.withColumn("DEP_TIME", clean_df["DEP_TIME"].cast("double"))
clean_df = clean_df.withColumn("ARR_TIME", clean_df["ARR_TIME"].cast("double"))
# Trim and Clean String Fields
from pyspark.sql.functions import trim
str_cols = ["AIRLINE", "AIRLINE_DOT", "ORIGIN_CITY", "DEST_CITY"]
for c in str_cols:
clean_df = clean_df.withColumn(c, trim(col(c)))
# Remove Outliers
clean_df = clean_df.filter((col("DEP_DELAY") < 360) & (col("DEP_DELAY") > -60))
clean_df = clean_df.filter((col("ARR_DELAY") < 360) & (col("ARR_DELAY") > -60))
from pyspark.sql.functions import (
count, when, isnan
)
clean_df.printSchema()
clean_df.describe().show()
root |-- FL_DATE: date (nullable = true) |-- AIRLINE: string (nullable = true) |-- AIRLINE_DOT: string (nullable = true) |-- AIRLINE_CODE: string (nullable = true) |-- DOT_CODE: integer (nullable = true) |-- FL_NUMBER: integer (nullable = true) |-- ORIGIN: string (nullable = true) |-- ORIGIN_CITY: string (nullable = true) |-- DEST: string (nullable = true) |-- DEST_CITY: string (nullable = true) |-- CRS_DEP_TIME: integer (nullable = true) |-- DEP_TIME: double (nullable = true) |-- DEP_DELAY: double (nullable = true) |-- TAXI_OUT: double (nullable = true) |-- WHEELS_OFF: double (nullable = true) |-- WHEELS_ON: double (nullable = true) |-- TAXI_IN: double (nullable = true) |-- CRS_ARR_TIME: integer (nullable = true) |-- ARR_TIME: double (nullable = true) |-- ARR_DELAY: double (nullable = true) |-- CANCELLED: double (nullable = true) |-- CANCELLATION_CODE: string (nullable = true) |-- DIVERTED: double (nullable = true) |-- CRS_ELAPSED_TIME: double (nullable = true) |-- ELAPSED_TIME: double (nullable = true) |-- AIR_TIME: double (nullable = true) |-- DISTANCE: double (nullable = true) |-- DELAY_DUE_CARRIER: double (nullable = false) |-- DELAY_DUE_WEATHER: double (nullable = false) |-- DELAY_DUE_NAS: double (nullable = false) |-- DELAY_DUE_SECURITY: double (nullable = false) |-- DELAY_DUE_LATE_AIRCRAFT: double (nullable = false)
[Stage 57:==================================================> (7 + 1) / 8]
+-------+--------------------+--------------------+------------+------------------+------------------+-------+------------+-------+------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------+-----------------+--------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-----------------------+ |summary| AIRLINE| AIRLINE_DOT|AIRLINE_CODE| DOT_CODE| FL_NUMBER| ORIGIN| ORIGIN_CITY| DEST| DEST_CITY| CRS_DEP_TIME| DEP_TIME| DEP_DELAY| TAXI_OUT| WHEELS_OFF| WHEELS_ON| TAXI_IN| CRS_ARR_TIME| ARR_TIME| ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED| CRS_ELAPSED_TIME| ELAPSED_TIME| AIR_TIME| DISTANCE| DELAY_DUE_CARRIER| DELAY_DUE_WEATHER| DELAY_DUE_NAS| DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT| +-------+--------------------+--------------------+------------+------------------+------------------+-------+------------+-------+------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------+-----------------+--------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-----------------------+ | count| 2905663| 2905663| 2905663| 2905663| 2905663|2905663| 2905663|2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 0| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| 2905663| | mean| NULL| NULL| NULL| 19976.10706265661| 2507.923478049588| NULL| NULL| NULL| NULL|1325.9025055555308|1329.5697333104356|8.423415929514194|16.62807937465563| 1352.25723526782|1462.7354400699599|7.668841500201503|1489.7350941936488|1466.788123743187|2.6379996579093996| 0.0| NULL| 0.0|142.35123618946864|136.56574626857966|112.26883090021107| 810.334542237004| 3.633803025333633|0.5521352613844069|2.2979612570349692|0.025852275367102103| 4.247468477934296| | stddev| NULL| NULL| NULL|376.70173553673527|1745.8278139650042| NULL| NULL| NULL| NULL| 485.3981017131451|499.03614662598665|33.91532378098644|9.162519969816012|500.58987842329657| 526.448955672582|6.232338716316493|510.84734516995115|531.0242626457665| 36.79725081234713| 0.0| NULL| 0.0| 71.65052556597922| 71.62860300665999| 69.70830249515912|588.8751508539929|18.274506409642203| 8.008969602669756|12.320970492500773| 1.3124513878142212| 20.270061385564016| | min|Alaska Airlines Inc.|Alaska Airlines I...| 9E| 19393| 1| ABE|Aberdeen, SD| ABE|Aberdeen, SD| 1| 1.0| -57.0| 1.0| 1.0| 1.0| 1.0| 1| 1.0| -59.0| 0.0| NULL| 0.0| 18.0| 15.0| 8.0| 29.0| 0.0| 0.0| 0.0| 0.0| 0.0| | max|United Air Lines ...|United Air Lines ...| YX| 20452| 9562| YUM| Yuma, AZ| YUM| Yuma, AZ| 2359| 2400.0| 359.0| 184.0| 2400.0| 2400.0| 249.0| 2400| 2400.0| 359.0| 0.0| NULL| 0.0| 705.0| 739.0| 692.0| 5812.0| 358.0| 359.0| 359.0| 301.0| 357.0| +-------+--------------------+--------------------+------------+------------------+------------------+-------+------------+-------+------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------+-----------------+--------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+-----------------------+
# Null Count per Column
from pyspark.sql.functions import col, sum as spark_sum, when, isnull
null_counts = clean_df.select([
spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
for c in clean_df.columns
])
null_counts.show()
[Stage 60:> (0 + 8) / 8]
+-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+ |FL_DATE|AIRLINE|AIRLINE_DOT|AIRLINE_CODE|DOT_CODE|FL_NUMBER|ORIGIN|ORIGIN_CITY|DEST|DEST_CITY|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|DELAY_DUE_CARRIER|DELAY_DUE_WEATHER|DELAY_DUE_NAS|DELAY_DUE_SECURITY|DELAY_DUE_LATE_AIRCRAFT| +-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+ | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 2905663| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| +-------+-------+-----------+------------+--------+---------+------+-----------+----+---------+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+------------+--------+--------+-----------------+-----------------+-------------+------------------+-----------------------+
clean_df = clean_df.drop("CANCELLATION_CODE")
Statistics¶
from pyspark.sql.functions import avg
delay_stats = clean_df.groupBy("AIRLINE").agg(
avg("DEP_DELAY").alias("avg_dep_delay"),
avg("ARR_DELAY").alias("avg_arr_delay"),
count("*").alias("num_flights")
)
delay_stats.orderBy("avg_arr_delay", ascending=False).show()
[Stage 63:=============================> (4 + 4) / 8]
+--------------------+------------------+--------------------+-----------+ | AIRLINE| avg_dep_delay| avg_arr_delay|num_flights| +--------------------+------------------+--------------------+-----------+ | Allegiant Air|10.663716814159292| 10.110659512273255| 49946| | JetBlue Airways|15.913813058015212| 10.104695594665099| 108868| |Frontier Airlines...|13.878156909189846| 9.026391600781025| 62482| | Spirit Air Lines|11.514032162641854| 6.589060399074921| 92965| |ExpressJet Airlin...| 8.699753529016357| 6.030752856822765| 17852| | Mesa Airlines Inc.| 9.437540182589688| 4.6006332776134755| 62216| |American Airlines...| 9.577982371391535| 3.714215738011924| 369513| |United Air Lines ...| 9.460429503626337| 3.3625972241863473| 247495| |Southwest Airline...|10.668871156988505| 3.150767342590393| 555684| |Hawaiian Airlines...| 4.137945201150333| 2.9878330120405776| 31643| | PSA Airlines Inc.| 6.624263908259724| 2.662366341236634| 103248| | Horizon Air| 4.671790816074341| 2.2538678266027383| 20231| | Envoy Air|5.5334585824081985| 2.175405636208369| 117100| |SkyWest Airlines ...| 6.491163047324325| 1.107962553150657| 333486| |Alaska Airlines Inc.| 4.295771060647738| 0.8727141212547246| 98157| | Republic Airline| 4.606081262373186|-0.48297667133191685| 137899| |Delta Air Lines Inc.| 6.335690806885991| -0.5869592722559188| 387279| | Endeavor Air Inc.| 4.459666602797471| -2.708674349218515| 109599| +--------------------+------------------+--------------------+-----------+
# Top 10 Most Frequent Routes
from pyspark.sql.functions import concat_ws
route_stats = clean_df.withColumn("ROUTE", concat_ws(" ā ", col("ORIGIN"), col("DEST"))) \
.groupBy("ROUTE").count() \
.orderBy("count", ascending=False)
route_stats.show(10, truncate=False)
[Stage 66:> (0 + 8) / 8]
+---------+-----+ |ROUTE |count| +---------+-----+ |SFO ā LAX|5213 | |LAX ā SFO|5066 | |OGG ā HNL|4587 | |LAX ā LAS|4536 | |HNL ā OGG|4488 | |LGA ā ORD|4485 | |LAS ā LAX|4441 | |ORD ā LGA|4384 | |LAX ā JFK|4293 | |JFK ā LAX|4184 | +---------+-----+ only showing top 10 rows
# Average Air Time and Distance per Route
route_airtime = clean_df.withColumn("ROUTE", concat_ws(" ā ", col("ORIGIN"), col("DEST"))) \
.groupBy("ROUTE") \
.agg(avg("AIR_TIME").alias("avg_air_time"),
avg("DISTANCE").alias("avg_distance"),
count("*").alias("num_flights"))
route_airtime.orderBy("num_flights", ascending=False).show(10, truncate=False)
[Stage 69:==================================================> (7 + 1) / 8]
+---------+------------------+------------+-----------+ |ROUTE |avg_air_time |avg_distance|num_flights| +---------+------------------+------------+-----------+ |SFO ā LAX|55.92998273546902 |337.0 |5213 | |LAX ā SFO|55.241808132649034|337.0 |5066 | |OGG ā HNL|24.111837802485283|100.0 |4587 | |LAX ā LAS|44.23258377425044 |236.0 |4536 | |HNL ā OGG|22.017602495543674|100.0 |4488 | |LGA ā ORD|117.39397993311037|733.0 |4485 | |LAS ā LAX|43.063949560909705|236.0 |4441 | |ORD ā LGA|97.95095802919708 |733.0 |4384 | |LAX ā JFK|289.08665269042626|2475.0 |4293 | |JFK ā LAX|330.61639579349907|2475.0 |4184 | +---------+------------------+------------+-----------+ only showing top 10 rows
# Delay Patterns by Cause
from pyspark.sql.functions import mean
delay_cause_cols = [
"DELAY_DUE_CARRIER", "DELAY_DUE_WEATHER", "DELAY_DUE_NAS",
"DELAY_DUE_SECURITY", "DELAY_DUE_LATE_AIRCRAFT"
]
clean_df.select([
mean(col).alias(f"avg_{col.lower()}") for col in delay_cause_cols
]).show()
[Stage 72:> (0 + 8) / 8]
+---------------------+---------------------+------------------+----------------------+---------------------------+ |avg_delay_due_carrier|avg_delay_due_weather| avg_delay_due_nas|avg_delay_due_security|avg_delay_due_late_aircraft| +---------------------+---------------------+------------------+----------------------+---------------------------+ | 3.633803025333633| 0.5521352613844069|2.2979612570349692| 0.025852275367102103| 4.247468477934296| +---------------------+---------------------+------------------+----------------------+---------------------------+
weather_delay_by_airport = clean_df.groupBy("ORIGIN").agg(
avg("DELAY_DUE_WEATHER").alias("avg_weather_delay"),
count("*").alias("num_flights")
).orderBy("avg_weather_delay", ascending=False)
weather_delay_by_airport.show(10)
[Stage 78:====================================> (5 + 3) / 8]
+------+------------------+-----------+ |ORIGIN| avg_weather_delay|num_flights| +------+------------------+-----------+ | GGG| 5.0| 381| | EKO| 4.799163179916318| 239| | PIH| 4.619047619047619| 357| | PGV| 4.607142857142857| 28| | DIK| 4.285714285714286| 84| | APN| 3.985663082437276| 279| | MQT|3.8851224105461393| 531| | WYS|3.7535211267605635| 142| | DRT|3.6483516483516483| 273| | COD|3.3487394957983194| 238| +------+------------------+-----------+ only showing top 10 rows
# Delay trends over time
from pyspark.sql.functions import year, month
delay_by_month = clean_df.withColumn("YEAR", year("FL_DATE")) \
.withColumn("MONTH", month("FL_DATE")) \
.groupBy("YEAR", "MONTH") \
.agg(avg("ARR_DELAY").alias("avg_arr_delay"),
avg("DEP_DELAY").alias("avg_dep_delay"))
delay_by_month.orderBy("YEAR", "MONTH").show()
[Stage 81:==============> (2 + 6) / 8]
+----+-----+-------------------+-------------------+ |YEAR|MONTH| avg_arr_delay| avg_dep_delay| +----+-----+-------------------+-------------------+ |2019| 1| 2.4820393167453116| 7.861189949678745| |2019| 2| 6.681138208291357| 11.158052804653197| |2019| 3| 1.8050101878030673| 7.680466269684573| |2019| 4| 3.282562537323336| 9.015095215977706| |2019| 5| 5.356540984902379| 10.235554501620426| |2019| 6| 9.745206261597755| 13.864427209719116| |2019| 7| 6.404983327724922| 11.737541680687693| |2019| 8| 6.003047486202923| 11.021544666141065| |2019| 9|-0.9359927619674289| 5.256407303832868| |2019| 10| 1.3794001184797182| 6.78538022635862| |2019| 11| -1.289566044543576| 5.186718929379736| |2019| 12| 4.234243263020585| 10.340205529572716| |2020| 1|-2.8118771375738434| 4.951971068091443| |2020| 2|-1.5591433278418452| 5.910916500476892| |2020| 3| -7.170927418621154| 0.5467348386979385| |2020| 4| -14.11148015214014| -3.760111426581668| |2020| 5|-11.689005416618647|-2.3127809150628096| |2020| 6| -8.052496632045543| -1.025683368823606| |2020| 7| -5.925855353490066|-0.2928237173200325| |2020| 8| -6.553676180455083|-0.6005787189267394| +----+-----+-------------------+-------------------+ only showing top 20 rows
# STEP 1: Calculate monthly delay stats in Spark
from pyspark.sql.functions import year, month, avg
delay_by_month = clean_df.withColumn("YEAR", year("FL_DATE")) \
.withColumn("MONTH", month("FL_DATE")) \
.groupBy("YEAR", "MONTH") \
.agg(
avg("ARR_DELAY").alias("avg_arr_delay"),
avg("DEP_DELAY").alias("avg_dep_delay")
) \
.orderBy("YEAR", "MONTH")
# STEP 2: Convert to Pandas (collect to driver)
delay_df = delay_by_month.toPandas()
# STEP 3: Create a proper datetime column
import pandas as pd
delay_df["DATE"] = pd.to_datetime(
delay_df[["YEAR", "MONTH"]].assign(DAY=1)
)
# STEP 4: Plot
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.plot(delay_df["DATE"], delay_df["avg_arr_delay"], marker='o', label='Avg Arrival Delay')
plt.plot(delay_df["DATE"], delay_df["avg_dep_delay"], marker='o', label='Avg Departure Delay')
plt.title("Monthly Average Flight Delays Over Time")
plt.xlabel("Date")
plt.ylabel("Delay (minutes)")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.xticks(rotation=45)
plt.show()
air.corr("DISTANCE","ARR_Delay")
0.002050595449321259