Show Menu
Cheatography

PySpark Cheat Sheet (DRAFT) by

PySpark - cheat sheet

This is a draft cheat sheet. It is a work in progress and is not finished yet.

What is Pyspark?

PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applic­ations using Python APIs, but also provides the PySpark shell for intera­ctively analyzing your data in a distri­buted enviro­nment.

Initia­lizing SparkS­ession

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession \
         .builder \
         .appName("Python Spark SQL basic example" ) \
         .config("spark.some.config.option","some-value") \
         .getOrCreate()
A SparkS­ession can be used create DataFrame, register
DataFrame as tables, execute SQL over tables, cache
tables, and read parquet files

Creating DataFrames

From RDDs
>>> from pyspar­k.s­ql.t­ypes import *

Creating DataFrames from RDDs

Infer Schema
>>> sc = spark.s­pa­rkC­ontext

>>> lines = sc.tex­tFi­le(­"­peo­ple.tx­t")

>>> parts = lines.m­ap­(lambda l: l.spli­t(",­"))

>>> people = parts.m­ap­(lambda p: Row(na­me=­p[0­],a­ge=­int­(p[­1])))

>>> peopledf = spark.c­re­ate­Dat­aFr­ame­(pe­ople)

Creating DataFrames from RDDs

Specify Schema
>>> people = parts.m­ap­(lambda p: Row(na­me=­p[0], age=in­t(p­[1].st­rip­())))

>>> schema­String = "name age"

>>> fields = [Struc­tFi­eld­(fi­eld­_name, String­Type(), True) for field_name in schema­Str­ing.sp­lit()]

>>> schema = Struct­Typ­e(f­ields)

>>> spark.c­re­ate­Dat­aFr­ame­(pe­ople, schema­).s­how()

From Spark Data Sources

JSON
>>> df = spark.r­ea­d.j­son­("cu­sto­mer.js­on")

>>> df.show()

>>> df2 = spark.r­ea­d.l­oad­("pe­opl­e.j­son­", format­="js­on")

From Spark Data Sources

Parquet Files
>>> df3 = spark.r­ea­d.l­oad­("us­ers.pa­rqu­et")

From Spark Data Sources

Text Files
>>> df4 = spark.r­ea­d.t­ext­("pe­opl­e.t­xt")

Duplicate Values

>>> df = df.dropDuplicates()

Inspect Data

>>> df.dtypes
Return df column names and data types
>>> df.show()
Display the content of df
>>> df.head()
Return first n rows
>>> df.first()
Return first row
>>> df.take(2)
Return the first n rows
>>> df.schema
Return the schema of df
>>> df.des­cri­be(­).s­how()
Compute summary statistics
>>> df.columns
Return the columns of df
>>> df.count()
Count the number of rows in df
>>> df.dis­tin­ct(­).c­ount()
Count the number of distinct rows in df
>>> df.pri­ntS­chema()
Print the schema of df
>>> df.exp­lain()
Print the (logical and physical) plans
 

Queries

>>> from pyspark.sql import functions as F

Select- Query

>>> df.select("firstName").show() 
>>> df.select("firstName","lastName").show()
>>> df.select(df["firstName"],df["age"]+ 1).show()
>>> df.select(df['age'] > 24).show()

Between- Query

>>> df.select(df.age.between(22, 24)).show()

Substring- Query

df.select(df.firstName.substr(1, 3) \
          .alias("name")).collect()

Starts­with, Endswith- Query

>>> df.select("firstName",
    df.lastName.startswith("Sm")).show()
>>> df.select(df.lastName.endswith("th")).show(

Like - Query

>>> df.select("firstName", 
    df.lastName.like("Smith")).show()

When- Query

>>> df.select("firstName",
          F.when(df.age > 30, 1) \ 
         .otherwise(0)) \
         .show()
>>> df[df.firstName.isin("Jane","Boris")].collect()

GroupBy

>>> df.groupBy("age").count().show()

Filter

>>> df.filter(df["age"]>24).show()

Add Columns

>>> df = df.withColumn('city',df.address.city) \
    .withColumn('postalCode',df.address.postalCode) \
    .withColumn('state',df.address.state) \
    .withColumn('streetAddress',df.address.streetAddress) \
    .withColumn('telePhoneNumber',
    explode(df.phoneNumber.number)) \
    .withColumn('telePhoneType',
    explode(df.phoneNumber.type))

Update Columns

>>> df = df.withColumnRenamed('telePhoneNumber', \
        'phoneNumber')

Remove Columns

>>> df = df.drop("address", "phoneNumber")
>>> df = df.drop(df.address).drop(df.phoneNumber)
 

Sort

>>> peopledf.sort(peopledf.age.desc()).collect()
>>> df.sort("age", ascending=False).collect()
>>> df.orderBy(["age","city"],ascending=[0,1]).collect()

Missing & Replacing Values

>>> df.na.fill(50).show()
>>> df.na.drop().show()
>>> df.na.replace(10, 20).show()

Repart­iti­oning

>>> df.repartition(10).rdd.getNumPartitions()
>>> df.coalesce(1).rdd.getNumPartitions()

Regist­ering DataFrames as Views

>>> peopledf.createGlobalTempView("people")
>>> df.createTempView("customer")
>>> df.createOrReplaceTempView("customer")

Query Views

>>> df5 = spark.sql("SELECT * FROM customer").show()
>>> peopledf2 =spark.sql("SELECT *FROM global_temp.people") \
.show()

Output- Data Structures

>>> rdd1 = df.rdd
Convert df into an RDD
>>> df.toJ­SON­().f­irst()
Convert df into a RDD of string
>>> df.toP­andas()
Return the contents of df as Pandas DataFrame

Output- Write & Save to Files

>>> df.select("firstName", "city")\
.write.save("nameAndCity.parquet")

>>> df.select("firstName", "age") \
.write.save("namesAndAges.json",format="json")

Stopping SparkS­ession

>>> spark.stop()