Show Menu
Cheatography

Cleaning with PySpark Cheat Sheet by

Useful code for cleaning big data :)

Defining Schema

from pyspark.sql.types import *
Schema=StructType([
  StructField(‘Store’,StringType(),nullable=True),
  StructField(‘StoreType’,StringType(),nullable=True),
  StructField(‘Assortment’,StringType(),nullable=True),
  StructField(‘CompetitionDistance’,FloatType(),nullable=True),
  StructField(‘CompetitionOpenSinceMonth’,IntegerType(),nullable=True),  StructField(‘CompetitionOpenSinceYear’,IntegerType(),nullable=True),  StructField(‘Promo2’,IntegerType(),nullable=True),
  StructField(‘Promo2SinceWeek’,IntegerType(),nullable=True),
  StructField(‘Promo2SinceYear’,IntegerType(),nullable=True),
  StructField(‘PromoInterval’,StringType(),nullable=True)
])
df = spark.read.option(“header”,True).schema(Schema).csv(‘store.csv’)

# We can drop invalid rows while reading the dataset by setting the read mode as “DROPMALFORMED”
df_1=spark.read.option(“header”,True).option(“mode”,’DROPMALFORMED’).csv(‘store.csv’)

df.show()
Spark does not detect schema itself properly, so we need to define the schema as well for the data set.

PySpark DataTypes

Type
Size (Byte)
Default
Range (Digits)
byte
1
0
3
Ints
short
2
0
5
int
4
0
10
long
8
0
Lots
floats
4
0.0f
Lots
floats
double
8
0.0d
Lots
Decima­lType
32
0.0
Lots

Filtering Data

voter_df.filter(voter_df['name'].isNotNull())
  OR  
voter_df.where(~ voter_df._c1.isNull())

voter_df.filter(voter_df.date.year > 1800)            
voter_df.where(voter_df['_c0'].contains('VOTE'))

#Multiple Conditions 
whereDF = flattenDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
whereDF.show(truncate=False)


#Unique Values
voter_df = df.select(df["VOTER NAME"]).distinct()

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

User Defined Functions

1. Define a Python method
def reverseString(mystr):
return mystr[::-1]

2. Wrap the function and store as a variable
udfReverseString = udf(reverseString, StringType())

3. Use with Spark
user_df = user_df.withColumn('ReverseName',
udfReverseString(user_df.Name))

Using SQL to clean script

df.cre­ate­OrR­epl­ace­Tem­pVi­ew(­"­tab­le1­")
df2 = spark.s­ql­("SELECT field1, field2 FROM table1­")
 

String Data Types

StringType
Varcha­rTy­pe(­length)
A variant of StringType which has a length limita­tion. Data writing will fail if the input string exceeds the length limitation
CharTy­pe(­length)
Reading column of type CharTy­pe(n) always returns string values of length n. Char type column comparison will pad the short one to the longer length.

Adding, renaming and removing columns

add - withColumn
voter_df.withColumn('year', voter_df.date.year)

renaming - withColumnRenamed
test_df_sex = test_df.withColumnRenamed('Gender', 'Sex')

drop

voter_df.drop('unused_column')


from pyspark.sql import functions as F

add_n = udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, 
and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), 
df.id.cast(IntegerType())))

Validating with Joins

parsed_df = spark.read.parquet('parsed_data.parquet')
company_df = spark.read.parquet('companies.parquet')
verified_df = parsed_df.join(company_df, parsed_df.company == company_df.company)

# This automatically removes any rows with a company not in the valid_df !

View data/a­ctions:

printS­che­ma(), head(), show(), count(), columns and describe()
show() - Displa­ys/­Prints a number of rows in a tabular format. By default it displays 20 rows and to change the default number, you can pass a value to show(n).

where as take(n) returns first n rows as Array of row objects. It is an alias for first().

count() - total rows
 

Complex Data Types

ArrayT­ype­(el­eme­ntType, contai­nsNull)
nts values comprising a sequence of elements
MapTyp­e(k­eyType, valueType, valueC­ont­ain­sNull)
Represents values comprising a set of key-value pairs. The data type of keys is described by keyType and the data type of values is described by valueType. For a MapType value, keys are not allowed to have null values. valueC­ont­ain­sNull is used to indicate if values of a MapType value can have null values.
Struct­Typ­e(f­ields)
Represents values with the structure described by a sequence of Struct­Fields (fields)

If, elif, else equivalent

.when(<if condition>, <then x>)

df.select(df.Name, df.Age,
.when(df.Age >= 18, "Adult")
.when(df.Age < 18, "Minor"))


.otherwise() is like else

df.select(df.Name, df.Age,
.when(df.Age >= 18, "Adult")
.otherwise("Minor")

Remove duplicate rows & replace values

dropDuplicates()
test_df_no_dup =
 test_df.select('User_ID','Gender', 'Age').dropDuplicates()



fillna() 
used to replace null value with any other value
df.fillna(value=-99,subset=
[“Promo2SinceWeek”,”Promo2SinceYear”]).show()


.withColumn()  ,when()
creating a new column, with value equal to 1 if 
Promo2SinceYear > 2000 otherwise 0

df.withColumn(“greater_than_2000”,
when(df.CompetitionDistance==2000,1).otherwise(0)
.alias(‘value_desc’)).show()
               
 

Comments

No comments yet. Add yours below!

Add a Comment

Your Comment

Please enter your name.

    Please enter your email address

      Please enter your Comment.

          Related Cheat Sheets

          Selenium WebDriver Cheat Sheet Cheat Sheet
          PySpark Fingertip Commands Cheat Sheet

          More Cheat Sheets by datamansam