Yanfei Kang
yanfeikang@buaa.edu.cn
School of Economics and Management
Beihang University
http://yanfei.site
Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.
Spark SQL uses this extra information to perform extra optimizations.
One use of Spark SQL is to execute SQL queries.
Spark SQL can also be used to read data from an existing Hive installation.
A Dataset is a distributed collection of data.
Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter
, etc.).
The Dataset API is available in Scala and Java.
Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).
A DataFrame is a Dataset organized into named columns.
It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
DataFrames can be constructed from a wide array of sources such as:
The DataFrame API is available in Scala, Java, Python, and R.
Spark SparkContext is an entry point to Spark and defined in org.apache.spark package since 1.x and used to programmatically create Spark RDD, accumulators and broadcast variables on the cluster. Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object sc is default available in spark-shell and it can be programmatically created using SparkContext class.
SparkSession introduced in version 2.0 and and is an entry point to underlying Spark functionality in order to programmatically create Spark RDD, DataFrame and DataSet. It’s object spark is default available in spark-shell and it can be created programmatically using SparkSession builder pattern.
import findspark
findspark.init('/usr/lib/spark-current/')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark").getOrCreate()
# sc.stop()
sc = spark.sparkContext # make a spark context for RDD
# Load a text file and convert each line to a Row.
from pyspark.sql import Row
lines = sc.textFile("/opt/apps/ecm/service/spark/2.4.5-hadoop3.1-1.0.2/package/spark-2.4.5-hadoop3.1-1.0.2/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
people
PythonRDD[2] at RDD at PythonRDD.scala:53
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.show()
+---+-------+ |age| name| +---+-------+ | 29|Michael| | 30| Andy| | 19| Justin| +---+-------+
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()
+------+---+ | name|age| +------+---+ |Justin| 19| +------+---+
teenagers.toPandas() # We could export the Spark DataFrame to a usual Pandas DataFrame
name | age | |
---|---|---|
0 | Justin | 19 |
sdf = spark.read.csv("/opt/apps/ecm/service/spark/2.4.5-hadoop3.1-1.0.2/package/spark-2.4.5-hadoop3.1-1.0.2/examples/src/main/resources/people.csv")
sdf.show() # Displays the content of the DataFrame to stdout
+------------------+ | _c0| +------------------+ | name;age;job| |Jorge;30;Developer| | Bob;32;Developer| +------------------+
sdf2 = spark.read.json("/opt/apps/ecm/service/spark/2.4.5-hadoop3.1-1.0.2/package/spark-2.4.5-hadoop3.1-1.0.2/examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
sdf2.show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
The CSV file dose not have a header of the data, but we could create a description (schema in Spark) for it .
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.
# Create a schema
schemaString = ["name", "age"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
schema = StructType(fields)
schema
StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))
sdf_withschema = spark.createDataFrame(people, schema)
sdf_withschema.show()
+-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
sdf.write.mode('overwrite').csv("myspark/")
import os
os.listdir("myspark")
['part-00000-c9ed3cde-6299-4462-acf3-265a53291dc6-c000.csv', '._SUCCESS.crc', '.part-00000-c9ed3cde-6299-4462-acf3-265a53291dc6-c000.csv.crc', '_SUCCESS']
## Load a local file
air0 = spark.read.options(header='true', inferSchema='true').csv("/home/yanfei/lectures/data/airdelay_small.csv")
air0
DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]
# We specify the correct schema by hand
from pyspark.sql.types import *
schema_sdf = StructType([
StructField('Year', IntegerType(), True),
StructField('Month', IntegerType(), True),
StructField('DayofMonth', IntegerType(), True),
StructField('DayOfWeek', IntegerType(), True),
StructField('DepTime', DoubleType(), True),
StructField('CRSDepTime', DoubleType(), True),
StructField('ArrTime', DoubleType(), True),
StructField('CRSArrTime', DoubleType(), True),
StructField('UniqueCarrier', StringType(), True),
StructField('FlightNum', StringType(), True),
StructField('TailNum', StringType(), True),
StructField('ActualElapsedTime', DoubleType(), True),
StructField('CRSElapsedTime', DoubleType(), True),
StructField('AirTime', DoubleType(), True),
StructField('ArrDelay', DoubleType(), True),
StructField('DepDelay', DoubleType(), True),
StructField('Origin', StringType(), True),
StructField('Dest', StringType(), True),
StructField('Distance', DoubleType(), True),
StructField('TaxiIn', DoubleType(), True),
StructField('TaxiOut', DoubleType(), True),
StructField('Cancelled', IntegerType(), True),
StructField('CancellationCode', StringType(), True),
StructField('Diverted', IntegerType(), True),
StructField('CarrierDelay', DoubleType(), True),
StructField('WeatherDelay', DoubleType(), True),
StructField('NASDelay', DoubleType(), True),
StructField('SecurityDelay', DoubleType(), True),
StructField('LateAircraftDelay', DoubleType(), True)
])
air = spark.read.options(header='true').schema(schema_sdf).csv("/home/yanfei/lectures/data/airdelay_small.csv")
air
DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: double, CRSDepTime: double, ArrTime: double, CRSArrTime: double, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: double, CRSElapsedTime: double, AirTime: double, ArrDelay: double, DepDelay: double, Origin: string, Dest: string, Distance: double, TaxiIn: double, TaxiOut: double, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: double, WeatherDelay: double, NASDelay: double, SecurityDelay: double, LateAircraftDelay: double]
air.describe().show()
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-------------+------------------+-------+------------------+------------------+------------------+-----------------+------------------+-------+-------+-----------------+------------------+------------------+---------+----------------+--------+------------------+------------------+------------------+--------------------+------------------+ |summary| Year| Month| DayofMonth| DayOfWeek| DepTime| CRSDepTime| ArrTime| CRSArrTime|UniqueCarrier| FlightNum|TailNum| ActualElapsedTime| CRSElapsedTime| AirTime| ArrDelay| DepDelay| Origin| Dest| Distance| TaxiIn| TaxiOut|Cancelled|CancellationCode|Diverted| CarrierDelay| WeatherDelay| NASDelay| SecurityDelay| LateAircraftDelay|| count| 1524481| 1524481| 1524481| 1524481| 1524481| 1524481| 1524481| 1524481| 1524481| 1524481|1524481| 1524481| 1524481| 1524481| 1524481| 1524481|1524481|1524481| 1524481| 1524481| 1524481| 1524481| 0| 1524481| 1524481| 1524481| 1524481| 1524481| 1524481| | mean|2005.2200309482375| 6.819439533847913|15.739877374660622|3.9425345412635515|1343.1007890554229|1335.312045870037|1490.5321424143692|1499.0464518744411| null|2100.4755290489024| 0.0|124.89908893584112|126.23595243233599|102.60629092786331|7.728503667805634| 9.065091660702889| null| null|723.8429826281863|7.4043402311999955|16.027061012895537| 0.0| null| 0.0|3.1917360728011697|0.6896806191746568| 3.555705187535955|0.025394216129948487| 4.100354809276075| | stddev|1.3298684774182468|3.3772885531435493| 8.788305183327493|1.9915258421755744|475.47666780749284|462.8261301202478| 499.1625758193302| 478.3461722588984| null| 1937.457867690337| 0.0| 70.9126133314637| 70.11716827588829| 78.48819536331416|35.41450193208633|32.169219586304244| null| null|571.7621650788815| 35.98689560676167|12.080643220148497| 0.0| null| 0.0| 18.41498545167556| 8.708781309481942|15.282413095867522| 1.1406729402330553|18.704288068396867| | min| 2003| 1| 1| 1| 1.0| 3.0| 1.0| 0.0| 9E| 1| 0| -681.0| 16.0| -1461.0| -735.0| -1197.0| ABE| ABE| 27.0| 0.0| 0.0| 0| null| 0| 0.0| 0.0| -13.0| 0.0| 0.0| | max| 2007| 12| 31| 7| 2644.0| 2359.0| 2742.0| 2359.0| YV| 999| n816ca| 1766.0| 660.0| 1936.0| 1779.0| 1752.0| YUM| YUM| 4962.0| 1470.0| 1439.0| 0| null| 0| 1665.0| 910.0| 1010.0| 382.0| 1060.0|
air.describe(['ArrDelay']).show()
+-------+------------------+ |summary| ArrDelay| +-------+------------------+ | count| 5432958| | mean| 6.97897995898367| | stddev|30.191156753519472| | min| -1238.0| | max| 1779.0| +-------+------------------+
air.printSchema()
root |-- Year: integer (nullable = true) |-- Month: integer (nullable = true) |-- DayofMonth: integer (nullable = true) |-- DayOfWeek: integer (nullable = true) |-- DepTime: double (nullable = true) |-- CRSDepTime: double (nullable = true) |-- ArrTime: double (nullable = true) |-- CRSArrTime: double (nullable = true) |-- UniqueCarrier: string (nullable = true) |-- FlightNum: string (nullable = true) |-- TailNum: string (nullable = true) |-- ActualElapsedTime: double (nullable = true) |-- CRSElapsedTime: double (nullable = true) |-- AirTime: double (nullable = true) |-- ArrDelay: double (nullable = true) |-- DepDelay: double (nullable = true) |-- Origin: string (nullable = true) |-- Dest: string (nullable = true) |-- Distance: double (nullable = true) |-- TaxiIn: double (nullable = true) |-- TaxiOut: double (nullable = true) |-- Cancelled: integer (nullable = true) |-- CancellationCode: string (nullable = true) |-- Diverted: integer (nullable = true) |-- CarrierDelay: double (nullable = true) |-- WeatherDelay: double (nullable = true) |-- NASDelay: double (nullable = true) |-- SecurityDelay: double (nullable = true) |-- LateAircraftDelay: double (nullable = true)
air.select(["ArrDelay","AirTime","Distance"]).show()
+--------+-------+--------+ |ArrDelay|AirTime|Distance| +--------+-------+--------+ | 2.0| 25.0| 127.0| | 29.0| 248.0| 1623.0| | null| null| null| | -2.0| 70.0| 451.0| | 11.0| 133.0| 1009.0| | 13.0| 177.0| 1562.0| | -12.0| 181.0| 1589.0| | 11.0| 364.0| 2611.0| | 13.0| 53.0| 304.0| | null| null| null| | -8.0| 293.0| 2537.0| | null| null| null| | null| null| null| | 55.0| 285.0| 1927.0| | 23.0| 149.0| 991.0| | 64.0| 35.0| 193.0| | 29.0| 25.0| 77.0| | null| null| null| | -6.0| 91.0| 678.0| | 35.0| 127.0| 998.0| +--------+-------+--------+ only showing top 20 rows
air.select(air['UniqueCarrier'], air['ArrDelay']>0).show()
+-------------+--------------+ |UniqueCarrier|(ArrDelay > 0)| +-------------+--------------+ | XE| true| | CO| true| | AA| true| | WN| false| | CO| true| | AA| true| | DL| false| | AA| true| | US| true| | AA| true| | AS| false| | UA| true| | TW| false| | NW| true| | NW| true| | AA| true| | DH| true| | WN| false| | AA| false| | CO| true| +-------------+--------------+ only showing top 20 rows
# group data with respect to some columns
air.groupBy(["UniqueCarrier","DayOfWeek"]).count().show()
+-------------+---------+------+ |UniqueCarrier|DayOfWeek| count| +-------------+---------+------+ | PS| 6| 406| | CO| 4| 55764| | ML (1)| 7| 442| | XE| 4| 14896| | TZ| 4| 1455| | OO| 3| 17310| | EA| 7| 6197| | OO| 4| 17666| | F9| 2| 1679| | EA| 5| 6295| | HA| 5| 1519| | UA| 4| 89272| | EV| 4| 9729| | DL| 6|106031| | FL| 5| 6962| | YV| 3| 4165| | AQ| 2| 1035| | ML (1)| 2| 502| | DL| 3|110827| | YV| 6| 3828| +-------------+---------+------+ only showing top 20 rows
## Group and sort
aircount=air.groupBy("UniqueCarrier").count()
aircount.sort("count",ascending=False).show()
+-------------+------+ |UniqueCarrier| count| +-------------+------+ | DL|765388| | WN|703368| | AA|684522| | US|649056| | UA|611957| | NW|473820| | CO|373858| | TW|179081| | HP|173509| | MQ|164790| | AS|129863| | OO|120223| | XE| 94311| | EV| 67148| | OH| 60630| | FL| 47540| | EA| 43723| | PI| 41489| | DH| 32900| | B6| 29111| +-------------+------+ only showing top 20 rows
## Returns a new DataFrame omitting rows with null values
air_without_na = air.na.drop()
air_without_na.show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
air_without_na.count()
0
air.count() # original file size
5548754
## Replace null values
air.na.fill("unknown").show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+-------+-------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay| Origin| Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+-------+-------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |2006| 7| 6| 4| 2055.0| 2055.0| 2150.0| 2148.0| XE| 2619| N11526| 55.0| 53.0| 25.0| 2.0| 0.0| IAH| LCH| 127.0| 8.0| 22.0| 0| unknown| 0| 0.0| 0.0| 0.0| 0.0| 0.0| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |2007| 10| 26| 5| 654.0| 700.0| 1507.0| 1515.0| AS| 802| N648AS| 313.0| 315.0| 293.0| -8.0| -6.0| PDX| BOS| 2537.0| 9.0| 11.0| 0| unknown| 0| 0.0| 0.0| 0.0| 0.0| 0.0| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |2004| 10| 18| 1| 939.0| 940.0| 1151.0| 1128.0| NW| 679| N317NB| 192.0| 168.0| 149.0| 23.0| -1.0| MSP| SLC| 991.0| 10.0| 33.0| 0| unknown| 0| 0.0| 0.0| 23.0| 0.0| 0.0| |2007| 5| 17| 4| 1944.0| 1830.0| 2034.0| 1930.0| AA| 1011| N3BDAA| 50.0| 60.0| 35.0| 64.0| 74.0| MIA| MCO| 193.0| 4.0| 11.0| 0| unknown| 0| 64.0| 0.0| 0.0| 0.0| 0.0| |2003| 7| 31| 4| 1738.0| 1710.0| 1819.0| 1750.0| DH| 7981| N309UE| 41.0| 40.0| 25.0| 29.0| 28.0| IAD| CHO| 77.0| 3.0| 12.0| 0| unknown| 0| 29.0| 0.0| 0.0| 0.0| 0.0| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| unknown| unknown|unknown| null| null| null| null| null|unknown|unknown| null| null| null| null| unknown| null| null| null| null| null| null| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+-------+-------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ only showing top 20 rows
air.na.replace('NA', "unknown").show()
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |2006| 7| 6| 4| 2055.0| 2055.0| 2150.0| 2148.0| XE| 2619| N11526| 55.0| 53.0| 25.0| 2.0| 0.0| IAH| LCH| 127.0| 8.0| 22.0| 0| null| 0| 0.0| 0.0| 0.0| 0.0| 0.0| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |2007| 10| 26| 5| 654.0| 700.0| 1507.0| 1515.0| AS| 802| N648AS| 313.0| 315.0| 293.0| -8.0| -6.0| PDX| BOS| 2537.0| 9.0| 11.0| 0| null| 0| 0.0| 0.0| 0.0| 0.0| 0.0| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |2004| 10| 18| 1| 939.0| 940.0| 1151.0| 1128.0| NW| 679| N317NB| 192.0| 168.0| 149.0| 23.0| -1.0| MSP| SLC| 991.0| 10.0| 33.0| 0| null| 0| 0.0| 0.0| 23.0| 0.0| 0.0| |2007| 5| 17| 4| 1944.0| 1830.0| 2034.0| 1930.0| AA| 1011| N3BDAA| 50.0| 60.0| 35.0| 64.0| 74.0| MIA| MCO| 193.0| 4.0| 11.0| 0| null| 0| 64.0| 0.0| 0.0| 0.0| 0.0| |2003| 7| 31| 4| 1738.0| 1710.0| 1819.0| 1750.0| DH| 7981| N309UE| 41.0| 40.0| 25.0| 29.0| 28.0| IAD| CHO| 77.0| 3.0| 12.0| 0| null| 0| 29.0| 0.0| 0.0| 0.0| 0.0| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| |null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ only showing top 20 rows
air.corr("Distance","ArrDelay")
0.008481756987561132
air.cov("Distance","ArrDelay")
140.57953260215643
air.filter(air.ArrDelay > 60).show() # filter with certain conditions
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ |2007| 5| 17| 4| 1944.0| 1830.0| 2034.0| 1930.0| AA| 1011| N3BDAA| 50.0| 60.0| 35.0| 64.0| 74.0| MIA| MCO| 193.0| 4.0| 11.0| 0| null| 0| 64.0| 0.0| 0.0| 0.0| 0.0| |2004| 4| 16| 5| 1550.0| 1525.0| 1750.0| 1638.0| XE| 2602| N15948| 120.0| 73.0| 35.0| 72.0| 25.0| EWR| BWI| 169.0| 5.0| 80.0| 0| null| 0| 0.0| 0.0| 51.0| 0.0| 21.0| |2005| 7| 1| 5| 1931.0| 1723.0| 2235.0| 2017.0| FL| 831| N978AT| 124.0| 114.0| 86.0| 138.0| 128.0| MDW| ATL| 590.0| 9.0| 29.0| 0| null| 0| 0.0| 0.0| 10.0| 0.0| 128.0| |2005| 11| 22| 2| 1849.0| 1705.0| 2034.0| 1815.0| B6| 55| N585JB| 105.0| 70.0| 56.0| 139.0| 104.0| BTV| JFK| 267.0| 4.0| 45.0| 0| null| 0| 0.0| 0.0| 35.0| 0.0| 104.0| |2007| 10| 21| 7| 1725.0| 1625.0| 1837.0| 1730.0| WN| 143| N389SW| 72.0| 65.0| 42.0| 67.0| 60.0| LAS| LAX| 236.0| 12.0| 18.0| 0| null| 0| 0.0| 0.0| 7.0| 0.0| 60.0| |2005| 4| 15| 5| 1519.0| 1310.0| 1751.0| 1545.0| AS| 631| N947AS| 152.0| 155.0| 136.0| 126.0| 129.0| LAS| SEA| 866.0| 5.0| 11.0| 0| null| 0| 4.0| 0.0| 0.0| 0.0| 122.0| |2006| 1| 24| 2| 1230.0| 1115.0| 1343.0| 1228.0| OH| 5050| N784CA| 73.0| 73.0| 50.0| 75.0| 75.0| JFK| BOS| 187.0| 5.0| 18.0| 0| null| 0| 75.0| 0.0| 0.0| 0.0| 0.0| |2007| 6| 16| 6| 1525.0| 1155.0| 1636.0| 1255.0| XE| 2317| N14933| 71.0| 60.0| 44.0| 221.0| 210.0| LFT| IAH| 201.0| 10.0| 17.0| 0| null| 0| 0.0| 0.0| 104.0| 0.0| 117.0| |2006| 12| 1| 5| 1016.0| 720.0| 1226.0| 925.0| MQ| 3484| N902BC| 190.0| 185.0| 167.0| 181.0| 176.0| CHS| DFW| 987.0| 11.0| 12.0| 0| null| 0| 176.0| 0.0| 5.0| 0.0| 0.0| |2007| 8| 25| 6| 1544.0| 1430.0| 2123.0| 2015.0| AA| 2073| N612AA| 279.0| 285.0| 256.0| 68.0| 74.0| ORD| SJU| 2072.0| 4.0| 19.0| 0| null| 0| 68.0| 0.0| 0.0| 0.0| 0.0| |2006| 7| 20| 4| 1102.0| 834.0| 1445.0| 1225.0| AA| 1652| N441AA| 163.0| 171.0| 144.0| 140.0| 148.0| ORD| RSW| 1120.0| 2.0| 17.0| 0| null| 0| 0.0| 140.0| 0.0| 0.0| 0.0| |2006| 7| 23| 7| 2110.0| 1920.0| 2228.0| 2035.0| WN| 2347| N433| 78.0| 75.0| 57.0| 113.0| 110.0| SNA| OAK| 371.0| 6.0| 15.0| 0| null| 0| 0.0| 0.0| 3.0| 4.0| 106.0| |2007| 1| 14| 7| 1110.0| 850.0| 1216.0| 1010.0| 9E| 5968| 85889E| 66.0| 80.0| 44.0| 126.0| 140.0| MEM| STL| 256.0| 5.0| 17.0| 0| null| 0| 126.0| 0.0| 0.0| 0.0| 0.0| |2004| 1| 11| 7| 1530.0| 1255.0| 1815.0| 1545.0| WN| 1405| N630| 105.0| 110.0| 94.0| 150.0| 155.0| OAK| PHX| 646.0| 3.0| 8.0| 0| null| 0| 150.0| 0.0| 0.0| 0.0| 0.0| |2007| 10| 26| 5| 1603.0| 1448.0| 1900.0| 1752.0| UA| 270| N535UA| 117.0| 124.0| 105.0| 68.0| 75.0| SNA| DEN| 846.0| 5.0| 7.0| 0| null| 0| 0.0| 0.0| 0.0| 0.0| 68.0| |2005| 10| 31| 1| 854.0| 858.0| 1136.0| 1000.0| MQ| 3384| N852AE| 162.0| 62.0| 41.0| 96.0| -4.0| SJT| DFW| 228.0| 109.0| 12.0| 0| null| 0| 0.0| 0.0| 96.0| 0.0| 0.0| |2006| 1| 6| 5| 2200.0| 2030.0| 2320.0| 2214.0| OH| 5830| N408CA| 140.0| 164.0| 119.0| 66.0| 90.0| LGA| BHM| 866.0| 5.0| 16.0| 0| null| 0| 0.0| 0.0| 0.0| 0.0| 66.0| |2005| 6| 1| 3| 1729.0| 1610.0| 1847.0| 1740.0| AA| 1966| N4UAAA| 78.0| 90.0| 62.0| 67.0| 79.0| DFW| MCI| 460.0| 3.0| 13.0| 0| null| 0| 67.0| 0.0| 0.0| 0.0| 0.0| |2007| 7| 13| 5| 1840.0| 1745.0| 2007.0| 1905.0| EV| 4263| N879AS| 147.0| 140.0| 125.0| 62.0| 55.0| DCA| JAN| 860.0| 6.0| 16.0| 0| null| 0| 55.0| 0.0| 7.0| 0.0| 0.0| |2006| 2| 26| 7| 1953.0| 1745.0| 2145.0| 1940.0| OO| 6765| N750SK| 112.0| 115.0| 92.0| 125.0| 128.0| DEN| TUS| 639.0| 6.0| 14.0| 0| null| 0| 125.0| 0.0| 0.0| 0.0| 0.0| +----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+ only showing top 20 rows
## air2 = air.select(["DayOfWeek","ArrDelay","AirTime","Distance"])
air2_pdf = air.select(["DayOfWeek", "ArrDelay","AirTime","Distance"]).toPandas()
air2_pdf
DayOfWeek | ArrDelay | AirTime | Distance | |
---|---|---|---|---|
0 | 4.0 | 2.0 | 25.0 | 127.0 |
1 | 7.0 | 29.0 | 248.0 | 1623.0 |
2 | NaN | NaN | NaN | NaN |
3 | 5.0 | -2.0 | 70.0 | 451.0 |
4 | 7.0 | 11.0 | 133.0 | 1009.0 |
... | ... | ... | ... | ... |
5548749 | 3.0 | 13.0 | 59.0 | 318.0 |
5548750 | 1.0 | 22.0 | 34.0 | 181.0 |
5548751 | 1.0 | 11.0 | 71.0 | 551.0 |
5548752 | NaN | NaN | NaN | NaN |
5548753 | 2.0 | -14.0 | 107.0 | 888.0 |
5548754 rows × 4 columns
import pandas as pd
def myfun(pdf):
out = dict()
out["ArrDelay"] = pdf.ArrDelay.mean()
out["AirTime"] = pdf.AirTime.mean()
out["Distance"] = pdf.Distance.mean()
return pd.DataFrame(out, index=[0])
myfun(air2_pdf)
ArrDelay | AirTime | Distance | |
---|---|---|---|
0 | 7.350591 | 102.688519 | 729.997977 |