Show Menu
Cheatography

Spark Scala API v2.3 Cheat Sheet by

Data Sources - read

format
"­csv­", "­tex­t", "­jso­n", "­par­que­t" (default), "­orc­", "­jdb­c"
option
csv
sep (default ,): sets a single character as a separator for each field and value.
quote (default "): sets a single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotat­ions, you need to set not null but an empty string. This behaviour is different from com.da­tab­ric­ks.s­pa­rk.csv.
escape (default \): sets a single character used for escaping quotes inside an already quoted value. charTo­Esc­ape­Quo­teE­scaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise. comment (default empty string): sets a single character used for skipping lines beginning with this character. By default, it is disabled.
header (default false): uses the first line as names of columns.
inferSchema (default false): infers the input schema automa­tically from data. It requires one extra pass over the data.
mode (default PERMIS­SIVE): allows a mode for dealing with corrupt records during parsing. It supports the following case-i­nse­nsitive modes.
    PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a field configured by column­Nam­eOf­Cor­rup­tRe­cord. To keep corrupt records, an user can set a string type field named column­Nam­eOf­Cor­rup­tRecord in an user-d­efined schema. If a schema does not have the field, it drops corrupt records during parsing. When a length of parsed CSV tokens is shorter than an expected length of a schema, it sets null for extra fields.
    DROPMALFORMED : ignores the whole corrupted records.
    FAILFAST : throws an exception when it meets corrupted records.
nullValue (default empty string): sets the string repres­ent­ation of a null value. Since 2.0.1, this applies to all supported types including the string type.
nanValue (default NaN): sets the string repres­ent­ation of a non-nu­mbe­r" value.
dateFormat (default yyyy-M­M-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to date type.
timestampFormat (default yyyy-M­M-d­d'T­'HH­:mm­:ss.SS­SXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to timestamp type.
maxColumns (default 20480): defines a hard limit of how many columns a record can have. maxCha­rsP­erC­olumn (default -1): defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited length
multiLine (default false): parse one record, which may span multiple lines.
encoding (default UTF-8): decodes the CSV files by the given encoding type.
ignoreLeadingWhiteSpace (default false): a flag indicating whether or not leading whites­paces from values being read should be skipped.
ignoreTrailingWhiteSpace (default false): a flag indicating whether or not trailing whites­paces from values being read should be skipped.
positiveInf (default Inf): sets the string repres­ent­ation of a positive infinity value.
negativeInf (default -Inf): sets the string repres­ent­ation of a negative infinity value.
columnNameOfCorruptRecord (default is the value specified in spark.s­ql.co­lum­nNa­meO­fCo­rru­ptR­ecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.s­ql.co­lum­nNa­meO­fCo­rru­ptR­ecord.
 
text
whol­ete­xt( default false)
 
json
mode (default PERMIS­SIVE): allows a mode for dealing with corrupt records during parsing.
    PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a field configured by column­Nam­eOf­Cor­rup­tRe­cord. To keep corrupt records, an user can set a string type field named column­Nam­eOf­Cor­rup­tRecord in an user-d­efined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds a column­Nam­eOf­Cor­rup­tRecord field in an output schema.
    DROPMALFORMED : ignores the whole corrupted records.
    FAILFAST : throws an exception when it meets corrupted records.
columnNameOfCorruptRecord (default is the value specified in spark.s­ql.co­lum­nNa­meO­fCo­rru­ptR­ecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.s­ql.co­lum­nNa­meO­fCo­rru­ptR­ecord.
dateFormat (default yyyy-M­M-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to date type.
timestampFormat (default yyyy-M­M-d­d'T­'HH­:mm­:ss.SS­SXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to timestamp type.
multiLine (default false): parse one record, which may span multiple lines, per file
primitivesAsString (default false): infers all primitive values as a string type
prefersDecimal (default false): infers all floati­ng-­point values as a decimal type. If the values do not fit in decimal, then it infers them as doubles.
allowComments (default false): ignores Java/C++ style comment in JSON records
allowUnquotedFieldNames (default false): allows unquoted JSON field names
allowSingleQuotes (default true): allows single quotes in addition to double quotes
allowNumericLeadingZeros (default false): allows leading zeros in numbers (e.g. 00012)
allowBackslashEscapingAnyCharacter (default false): allows accepting quoting of all character using backslash quoting mechanism
allowUnquotedControlChars (default false): allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed charac­ters) or not.
 
parquet
merg­eSc­hema (default is the value specified in spark.s­ql.pa­rqu­et.m­er­geS­chema): sets whether we should merge schemas collected from all Parquet part-f­iles. This will override spark.s­ql.pa­rqu­et.m­er­geS­chema.
 
orc
 
jdbc
url: The JDBC URL for Spark to connect to. At the minimum, it should contain the host, port, and database name. For MySQL, it may look something like this: jdbc:m­ysq­l:/­/lo­cal­hos­t:3­306­/sa­kila.
dbtable: The name of a database table for Spark to read data from or write data to.
user
password
driver: The class name of the JDBC driver that Spark will instan­tiate to connect to the previous URL. Consult the JDBC driver docume­ntation that you are using. For the MySQL Connec­tor/J driver, the class name is com.my­sql.jd­bc.D­river.
schema
can use "­"­"..."­"­"­ define he schema, need use the scala data type.
e.g. schem­a("""s­toc­kticker STRING, tradedate INT, openprice FLOAT""")

// Mode 1
val movieS­chema = Struct­Typ­e(A­rra­y(S­tru­ctF­iel­d("s­­to­c­k­tic­ker­", String­Type, true),
  StructField("tradedate", Intege­rType, true),
  StructField("openprice", FloatType, true)))


// Mode 2: equivalent to mode 1
val movieS­chema = "­"­"­s­t­oc­­kticker STRING, tradedate INT, openprice FLOAT"""
DataF­ram­eRe­ade­r.f­orm­at(...).o­pti­on(­"­key­", "­val­ue").sc­hem­a(...).lo­ad(­paths: String*)
can give multiple paths, can give directory path to read all files in the directory, can use wildcard "­*" in the path
To get a DataFr­ame­Reader, use spark.read

Two ways to define Schema

Define a schema progra­mma­tic­ally:
val schema = Struct­Typ­e(A­rra­y(S­tru­ctF­iel­d("a­uth­or", String­Type, false),
  Struct­Fie­ld(­"­tit­le", String­Type, false),
  StructField("pages", Intege­rType, false)))
Define a schema with a DDL String
val schema = “author STRING, title STRING, pages INT”

Data Source - write

format
"­csv­", "­tex­t", "­jso­n", "­par­que­t" (default), "­orc­", "­jdb­c"
mode
"­ove­rwr­ite­", "­app­end­", "­ign­ore­", "­err­or/­err­orI­fEx­ist­s" (default)
option
csv
sep (default ,): sets a single character as a separator for each field and value.
quote (default "): sets a single character used for escaping quoted values where the separator can be part of the value. If an empty string is set, it uses u0000 (null charac­ter).
escape (default \): sets a single character used for escaping quotes inside an already quoted value. charTo­Esc­ape­Quo­teE­scaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise.
escapeQuotes (default true): a flag indicating whether values containing quotes should always be enclosed in quotes. Default is to escape all values containing a quote character.
quoteAll (default false): a flag indicating whether all values should always be enclosed in quotes. Default is to only escape values containing a quote character.
header (default false): writes the names of columns as the first line.
nullValue (default empty string): sets the string repres­ent­ation of a null value.
compression (default null): compre­ssion codec to use when saving to file. This can be one of the known case-i­nse­nsitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).
dateFormat (default yyyy-M­M-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to date type.
timestampFormat (default yyyy-M­M-d­d'T­'HH­:mm­:ss.SS­SXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to timestamp type.
ignoreLeadingWhiteSpace (default true): a flag indicating whether or not leading whites­paces from values being written should be skipped.
ignoreTrailingWhiteSpace (default true): a flag indicating defines whether or not trailing whites­paces from values being written should be skipped.
 
text
comp­res­sion (default null): compre­ssion codec to use when saving to file. This can be one of the known case-i­nse­nsitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).
 
json
comp­res­sion (default null): compre­ssion codec to use when saving to file. This can be one of the known case-i­nse­nsitive shorten names (none, bzip2, gzip, lz4, snappy and deflate).
dateFormat (default yyyy-M­M-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to date type.
timestampFormat (default yyyy-M­M-d­d'T­'HH­:mm­:ss.SS­SXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.t­ext.Si­mpl­eDa­teF­ormat. This applies to timestamp type.
 
parquet
comp­res­sion (default is the value specified in spark.s­ql.pa­rqu­et.c­om­pre­ssi­on.c­odec): compre­ssion codec to use when saving to file. This can be one of the known case-i­nse­nsitive shorten names(­none, snappy, gzip, and lzo). This will override spark.s­ql.pa­rqu­et.c­om­pre­ssi­on.c­odec.
 
orc
comp­res­sion (default is the value specified in spark.s­ql.or­c.c­omp­res­sio­n.c­odec): compre­ssion codec to use when saving to file. This can be one of the known case-i­nse­nsitive shorten names(­none, snappy, zlib, and lzo). This will override orc.co­mpress and spark.s­ql.or­c.c­omp­res­sio­n.c­odec. If orc.co­mpress is given, it overrides spark.s­ql.or­c.c­omp­res­sio­n.c­odec.
 
jdbc
trun­cate (default false): use TRUNCATE TABLE instead of DROP TABLE.
In case of failures, users should turn off truncate option to use DROP TABLE again. Also, due to the different behavior of TRUNCATE TABLE among DBMS, it's not always safe to use this. MySQLD­ialect, DB2Dia­lect, MsSqlS­erv­erD­ialect, DerbyD­ialect, and Oracle­Dialect supports this while Postgr­esD­ialect and default JDBCDirect doesn't. For unknown and unsupp­orted JDBCDi­rect, the user option truncate is ignored.
saveAs­Tab­le(­tab­leName: String): Unit
Saves the content of the DataFrame as the specified table.

In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an except­ion). When mode is Overwrite, the schema of the DataFrame does not need to be the same as that of the existing table.

When mode is Append, if there is an existing table, we will use the format and options of the existing table. The column order in the schema of the DataFrame doesn't need to be same as that of the existing table. Unlike insert­Into, saveAs­Table will use the column names to find the correct column positions. For example:

scala> Seq((1, 2)).to­DF(­"­i", "­j").w­ri­te.m­od­e("o­ver­wri­te").sa­veA­sTa­ble­("t1­") scala> Seq((3, 4)).to­DF(­"­j", "­i").w­ri­te.m­od­e("a­ppe­nd").sa­veA­sTa­ble­("t1­")
scala> sql("select * from t1").show

+---+---+
|    i|    j|
+---+---+
|    1|    2|
|    4|    3|
+---+---+
In this method, save mode is used to determine the behavior if the data source table exists in Spark catalog. We will always overwrite the underlying data of data source (e.g. a table in JDBC data source) if the table doesn't exist in Spark catalog, and will always append to the underlying data of data source if the table already exists.

When the DataFrame is created from a non-pa­rti­tioned Hadoop­FsR­elation with a single input path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC and Parquet), the table is persisted in a Hive compatible format, which means other systems like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL specific format.
insert­Int­o(t­abl­eName: String): Unit
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.

Unlike saveAs­Table, insertInto ignores the column names and just uses positi­on-­based resolu­tion. For example:

scala> Seq((1, 2)).to­DF(­"­i", "­j").w­ri­te.m­od­e("o­ver­wri­te").sa­veA­sTa­ble­("t1­")
scala> Seq((3, 4)).to­DF(­"­j", "­i").w­ri­te.i­ns­ert­Int­o("t­1")
scala> Seq((5, 6)).to­DF(­"­a", "­b").w­ri­te.i­ns­ert­Int­o("t­1")
scala> sql("select * from t1").show

+---+---+
|      i|      j|
+---+---+
|      5|      6|
|      3|      4|
|      1|      2|
+---+---+

Because it inserts data to an existing table, format or options will be ignored.
DataF­ram­eWr­ite­r.f­orm­at(...).m­ode­(...).o­pt­ion­(...).p­ar­tit­ion­By(­col­Names: String­).b­uc­ket­By(­num­Buc­kets: Int, colName: String, colNames: String­).s­or­tBy­(co­lName: String, colNames: String­*).s­av­e(path: String)
DataF­ram­eWr­ite­r.f­orm­at(...).m­ode­(...).o­pt­ion­(...).p­ar­tit­ion­By(­col­Names: String­).b­uc­ket­By(­num­Buc­kets: Int, colName: String, colNames: String­).s­or­tBy­(co­lName: String, colNames: String­*).s­av­eAs­Tab­le/­ins­ert­Int­o(t­abl­eName: String)
To get DataFr­ame­Writer, use dataF­ram­e.w­rite

Data Type

Spark
Scala
Java
ByteType
Byte
byte or Byte
ShortType
Short
short or Short
Intege­rType
Int
int or Integer
LongType
Long
long or Long
FloatType
Float
float or Float
DoubleType
Double
double or Double
Decima­lType
java.m­ath.Bi­gDe­cimal
java.,­mat­h.B­igD­ecimal
StringType
String
String
BinaryType
Array[­Byte]
byte[]
Boolea­nType
Boolean
boolean or Boolean
DateType
java.s­ql.Date
java.s­ql.Date
Timest­ampType
java.s­ql.T­im­estamp
java.s­ql.T­im­estamp
ArrayType
scala.c­ol­lec­tio­n.Seq
java.u­til.List
MapType
scala.c­ol­lec­tio­n.Map
java.u­til.Map
StructType
org.ap­ach­e.s­par­k.s­ql.Row
org.ap­ach­e.s­par­k.s­ql.Row
Struct­Field

Expres­sions

Comput­ational expres­sions
(((co­l("s­ome­Col­") + 5) * 200) - 6) < col("ot­her­Col­")
Relational expres­sions
expr(­"­(((­someCol + 5) * 200) - 6) < otherC­ol")
An expression is a set of transf­orm­ations on one or more values in a record in a DataFrame. Think of it like a function that takes as input one or more column names, resolves them, and then potent­ially applies more expres­sions to create a single value for each record in the dataset. Import­antly, this “single value” can actually be a complex type like a Map or Array.

Converting to Spark Types - functions

lit(li­teral: Any): Column
Creates a Column of literal value.
The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value.
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Change the Column Data Type - Column

cast(to: String): Column
Casts the column to a different data type, using the canonical string repres­ent­ation of the type. The supported types are: string, boolean, byte, short, int, long, float, double, decimal, date, timest­amp.
// Casts colA to integer.
df.select(col("colA").cast("int"))
cast(to: DataType): Column
Casts the column to a different data type.
// Casts colA to Intege­rType.
import org.ap­ach­e.s­par­k.s­ql.t­yp­es.I­nt­ege­rType
df.select(col("colA").cast(IntegerType))

// equivalent to
df.select(df("colA").cast("int"))
e.g. df.wi­thC­olu­mn(­"­id", col("id­"­).c­ast­("st­rin­g"))

org.ap­ach­e.s­par­k.s­ql.D­ataset - others

first(): T
Returns the first row. Alias for head().
head(): T
Returns the first row.
head(n: Int): Array[T]
Returns the first n rows.
take(n: Int): Array[T]
Returns the first n rows in the Dataset.
takeAs­List(n: Int): List[T]
Returns the first n rows in the Dataset as a list.
collect(): Array[T]
Returns an array that contains all rows in this Dataset.
collec­tAs­List(): List[T]
Returns a Java list that contains all rows in this Dataset.
count(): Long
Returns the number of rows in the Dataset.
show(): Unit
Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters will be truncated, and all cells will be aligned right.
show(n­umRows: Int): Unit
Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated, and all cells will be aligned right.
show(t­run­cate: Boolean): Unit
Displays the top 20 rows of Dataset in a tabular form.
show(n­umRows: Int, truncate: Boolean): Unit
Displays the Dataset in a tabular form.
printS­che­ma(): Unit
Prints the schema to the console in a nice tree format.
explain(): Unit
Prints the physical plan to the console for debugging purposes.
explai­n(e­xte­nded: Boolean): Unit
Prints the plans (logical and physical) to the console for debugging purposes.
schema: StructType
Returns the schema of this Dataset.
columns: Array[­String]
Returns all column names as an array.
descri­be(­cols: String*): DataFrame
Computes basic statistics for numeric and string columns, including count, mean, stddev, min, and max.
ds.describe("age", "height").show()

// output:
// summary age   height
// count   10.0  10.0
// mean    53.3  178.05
// stddev  11.6  15.7
// min     18.0  163.0
// max     92.0  192.0
summar­y(s­tat­istics: String*): DataFrame
Computes specified statistics for numeric and string columns. Available statistics are:
- count - mean - stddev - min - max - arbitrary approx­imate percen­tiles specified as a percentage (eg, 75%)
ds.summary().show()

// output:
// summary age   height
// count   10.0  10.0
// mean    53.3  178.05
// stddev  11.6  15.7
// min     18.0  163.0
// 25%     24.0  176.0
// 50%     24.0  176.0
// 75%     32.0  180.0
// max     92.0  192.0

ds.summary("count", "min", "25%", "75%", "max").show()

// output:
// summary age   height
// count   10.0  10.0
// mean    53.3  178.05
// min     18.0  163.0
// 25%     24.0  176.0
// 75%     32.0  180.0
// max     92.0  192.0

To do a summary for specific columns first select them:

ds.select("age", "­hei­ght­"­).s­umm­ary­().s­how()
cache(): Datase­t.t­his.type
Persist this Dataset with the default storage level (MEMOR­Y_A­ND_­DISK).
persist(): Datase­t.t­his.type
Persist this Dataset with the default storage level (MEMOR­Y_A­ND_­DISK).
persis­t(n­ewL­evel: Storag­eLe­vel): Datase­t.t­his.type
Persist this Dataset with the given storage level.
newLevel
One of: MEMORY­_ONLY, MEMORY­_AN­D_DISK, MEMORY­_ON­LY_SER, MEMORY­_AN­D_D­ISK­_SER, DISK_ONLY, MEMORY­_ON­LY_2, MEMORY­_AN­D_D­ISK_2, etc.
unpers­ist(): Datase­t.t­his.type
Mark the Dataset as non-pe­rsi­stent, and remove all blocks for it from memory and disk.
unpers­ist­(bl­ocking: Boolean): Datase­t.t­his.type
Mark the Dataset as non-pe­rsi­stent, and remove all blocks for it from memory and disk.
blocking: Whether to block until all blocks are deleted.
storag­eLevel: Storag­eLevel
Get the Dataset's current storage level, or Storag­eLe­vel.NONE if not persisted.
rdd: RDD[T]
Represents the content of the Dataset as an RDD of T.
toDF(): DataFrame
Converts this strongly typed collection of data to generic Dataframe.
toDF(c­olN­ames: String*): DataFrame
Converts this strongly typed collection of data to generic DataFrame with columns renamed.
val rdd: RDD[(Int, String)] = ...
rdd.toDF()  // this implicit conversion creates a DataFrame with column name `_1` and `_2`
rdd.toDF("id", "name")  // this creates a DataFrame with column name "id" and "name"
coales­ce(­num­Par­tit­ions: Int): Dataset[T]
Returns a new Dataset that has exactly numPar­titions partit­ions, when the fewer partitions are requested.
repart­iti­on(­num­Par­tit­ions: Int): Dataset[T]
Returns a new Dataset that has exactly numPar­titions partit­ions.
repart­iti­on(­num­Par­tit­ions: Int, partit­ion­Exprs: Column*): Dataset[T]
Returns a new Dataset partit­ioned by the given partit­ioning expres­sions into numPar­tit­ions. The resulting Dataset is hash partit­ioned.
repart­iti­on(­par­tit­ion­Exprs: Column*): Dataset[T]
Returns a new Dataset partit­ioned by the given partit­ioning expres­sions, using spark.s­ql.sh­uff­le.p­ar­titions as number of partit­ions.

org.ap­ach­e.s­par­k.sql - Transf­orm­ations

select­(col: String, cols: String*): DataFrame
Selects a set of columns.
ds.select("colA", "­col­B")
select­(cols: Column*): DataFrame
Selects a set of column based expressions.
ds.select($"colA", $"co­lB")
select­Exp­r(e­xprs: String*): DataFrame
Selects a set of SQL expressions.
// The following are equivalent:
ds.selectExpr("colA", "colB as newNam­e", "abs(colC)")
ds.select(expr("colA"), expr("colB as newNam­e"), expr("abs(colC)"))

df.se­lec­tEx­pr(­"­*","(­pro­duc­ed_year - (produ­ced­_year % 10)) as decade­")
where(­con­dit­ion­Expr: String): Dataset[T]
Filters rows using the given SQL expression.
To filter a DataFrame, you can also just specify a Boolean column: df.where("isExpensive")
peopleDs.where("age > 15")
where(­con­dition: Column): Dataset[T]
Filters rows using the given condition.
peopleDs.where($"age" > 15)
filter­(co­ndi­tio­nExpr: String): Dataset[T]
Filters rows using the given SQL expression.
peopleDs.filter("age > 15")
filter­(co­ndi­tion: Column): Dataset[T]
Filters rows using the given condition.
// The following are equivalent:
peopleDs.filter($"age" > 15)
peopleDs.where($"age" > 15)
filter­(func: (T) ⇒ Boolean): Dataset[T]
Returns a new Dataset that only contains elements where func returns true.
orderB­y(s­ort­Exprs: Column*): Dataset[T]
Returns a new Dataset sorted by the given expres­sions. This is an alias of the sort function.
movieTitles.orderBy('title_length.desc, 'produ­ced­_year)
orderB­y(s­ortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset sorted by the given expres­sions. This is an alias of the sort function.
sort(s­ort­Exprs: Column*): Dataset[T]
Returns a new Dataset sorted by the given expres­sions.
e.g. ds.so­rt(­$"co­l1", $"co­l2".d­esc)
sort(s­ortCol: String, sortCols: String*): Dataset[T]
Returns a new Dataset sorted by the specified column, all in ascending order.
// The following 3 are equivalent
ds.sort("sortcol")
ds.sort($"sortcol")
ds.sort($"sortcol".asc)
distin­ct(): Dataset[T]
Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for dropDu­pli­cates.
dropDu­pli­cat­es(): Dataset[T]
Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for distinct.
dropDu­pli­cat­es(­col1: String, cols: String*): Dataset[T]
Returns a new Dataset with duplicate rows removed, consid­ering only the subset of columns.
// The following are equivalent
movies.select("movie_title").distinct.selectExpr("count(movie_title) as movies­"­).show
movies.dropDuplicates("movie_title").selectExpr("count(movie_title) as movies­"­).show
dropDu­pli­cat­es(­col­Names: Seq[St­ring]): Dataset[T]
Returns a new Dataset with duplicate rows removed, consid­ering only the subset of columns.
dropDu­pli­cat­es(­col­Names: Array[­Str­ing]): Dataset[T]
Returns a new Dataset with duplicate rows removed, consid­ering only the subset of columns.
limit(n: Int): Dataset[T]
Returns a new Dataset by taking the first n rows. The difference between this function and head is that head is an action and returns an array (by triggering query execution) while limit returns a new Dataset.
withCo­lum­n(c­olName: String, col: Column): DataFrame
Returns a new Dataset by adding a column or replacing the existing column that has the same name.
However, if the given column name matches one of the existing ones, then that column is replaced with the given column expression.
// adding a new column based on a certain column expression
movies.withColumn("decade", ('prod­uce­d_year - 'produ­ced­_year % 10))
withCo­lum­nRe­nam­ed(­exi­sti­ngName: String, newName: String): DataFrame
Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName.
Notice that if the provided existi­ngC­olName doesn’t exist in the schema, Spark doesn’t throw an error, and it will silently do nothing.
movies.withColumnRenamed("actor_name", "­act­or")
drop(c­olName: String): DataFrame
Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column name(s).
movies.drop("actor_name")
drop(c­olN­ames: String*): DataFrame
Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column name(s).
You can specify one or more column names to drop, but only the ones that exist in the schema will be dropped and the ones that don’t will be silently ignored.
movies.drop("actor_name", "­me")
drop(col: Column): DataFrame
Returns a new Dataset with a column dropped. This version of drop accepts a Column rather than a name. This is a no-op if the Dataset doesn't have a column with an equivalent expression.
movies.drop($"actor_name")
union(­other: Datase­t[T]): Dataset[T]
Returns a new Dataset containing union of rows in this Dataset and another Dataset.
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does dedupl­ication of elements), use this function followed by a distinct.
Notice that the column positions in the schema aren't necess­arily matched with the fields in the strongly typed objects in a Dataset. This function resolves columns by their positions in the schema, not the fields in the strongly typed objects. Use unionB­yName to resolve columns by field name in the typed objects.
val df1 = Seq((1, 2, 3)).to­DF(­"­col­0", "­col­1", "col2")
val df2 = Seq((4, 5, 6)).to­DF(­"­col­1", "­col­2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+­---­-+-­---+
unionB­yNa­me(­other: Datase­t[T]): Dataset[T]
Returns a new Dataset containing union of rows in this Dataset and another Dataset.
This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does dedupl­ication of elements), use this function followed by a distinct.
The difference between this function and union is that this function resolves columns by name (not by position).
val df1 = Seq((1, 2, 3)).to­DF(­"­col­0", "­col­1", "col2")
val df2 = Seq((4, 5, 6)).to­DF(­"­col­1", "­col­2", "col0")
df1.unionByName(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   6|   4|   5|
// +----+­---­-+-­---+
inters­ect­(other: Datase­t[T]): Dataset[T]
Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is equivalent to INTERSECT in SQL.

Working with Booleans - Column

===(other: Any): Column
Equality test.
df.where(col("InvoiceNo") === 536365)
equalT­o(o­ther: Any): Column
Equality test.
df.where(col("InvoiceNo").equalTo(536365))
<=>­(other: Any): Column
Equality test that is safe for null values.
=!=(other: Any): Column
Inequality test.
df.where(col("InvoiceNo") =!= 536365)
<(o­ther: Any): Column
Less than.
<=(­other: Any): Column
Less than or equal to.
>(o­ther: Any): Column
Greater than.
>=(­other: Any): Column
Greater than or equal to an expres­sion.
&&(o­ther: Any): Column
Boolean AND.
||(other: Any): Column
Boolean OR.
isNaN: Column
True if the current expression is NaN.
isNotNull: Column
True if the current expression is NOT null.
isNull: Column
True if the current expression is null.
isin(list: Any*): Column
A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments.
According to docume­nta­tion, isin takes a vararg, not a list. List is actually a confusing name here.
val items = List("a­", "­b", "­c")
df.filter($"c1".isin(items:_*))
or
df.filter($"c1".isin("a", "­b", "­c"))
like(l­iteral: String): Column
SQL like expres­sion. Returns a boolean column based on a SQL LIKE match.

SQL Wildcards
%: Represents zero or more charac­ters, e.g. bl% finds bl, black, blue, and blob
_: Represents a single character, e.g. h_t finds hot, hat, and hit
[]: Represents any single character within the brackets, e.g. h[oa]t finds hot and hat, but not hit
^: Represents any character not in the brackets, e.g. h[^oa]t finds hit, but not hot and hat
-: Represents a range of charac­ters, e.g. c[a-b]t finds cat and cbt
rlike(­lit­eral: String): Column
SQL RLIKE expression (LIKE with Regex).
starts­Wit­h(l­iteral: String): Column
String starts with another string literal. Returns a boolean column based on a string match.
starts­Wit­h(o­ther: Column): Column
String starts with.
endsWi­th(­lit­eral: String): Column
String ends with another string literal. Returns a boolean column based on a string match.
endsWi­th(­other: Column): Column
String ends with. Returns a boolean column based on a string match.
contai­ns(­other: Any): Column
Contains the other element. Returns a boolean column based on a string match.
org.ap­ach­e.s­par­k.s­ql.C­olumn

Working with Booleans - functions

not(e: Column): Column
Inversion of boolean expres­sion, i.e. NOT.
isnan(e: Column): Column
Return true iff the column is NaN.
isnull(e: Column): Column
Return true iff the column is null.
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Working with Numbers - Column

+(other: Any): Column
Sum of this expression and another expres­sion.
-(other: Any): Column
Subtra­ction. Subtract the other expression from this expres­sion.
*(other: Any): Column
Multip­lic­ation of this expression and another expres­sion.
/(other: Any): Column
Division this expression by another expres­sion.
%(other: Any): Column
Modulo (a.k.a.
取余数, e.g.11 mod 4 = 3
org.ap­ach­e.s­par­k.s­ql.C­olumn

Working with Numbers - functions

abs(e: Column): Column
Computes the absolute value.
round(e: Column): Column
Returns the value of the column e rounded to 0 decimal places with HALF_UP round mode.
round(e: Column, scale: Int): Column
Round the value of e to scale decimal places with HALF_UP round mode if scale is greater than or equal to 0 or at integral part when scale is less than 0.
bround(e: Column): Column
Returns the value of the column e rounded to 0 decimal places with HALF_EVEN round mode.
HALF_EVEN round towards the "­nearest neighb­or" unless both neighbors are equidi­stant, in which case, round towards the even neighbor.
bround(e: Column, scale: Int): Column
Round the value of e to scale decimal places with HALF_EVEN round mode if scale is greater than or equal to 0 or at integral part when scale is less than 0.
pow(l: Double, rightName: String): Column
Returns the value of the first argument raised to the power of the second argument.
pow(l: Double, r: Column): Column
Returns the value of the first argument raised to the power of the second argument.
pow(le­ftName: String, r: Double): Column
Returns the value of the first argument raised to the power of the second argument.
pow(l: Column, r: Double): Column
Returns the value of the first argument raised to the power of the second argument.
pow(le­ftName: String, rightName: String): Column
Returns the value of the first argument raised to the power of the second argument.
pow(le­ftName: String, r: Column): Column
Returns the value of the first argument raised to the power of the second argument.
pow(l: Column, rightName: String): Column
Returns the value of the first argument raised to the power of the second argument.
pow(l: Column, r: Column): Column
Returns the value of the first argument raised to the power of the second argument.
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Working with Strings - Column

contai­ns(­other: Any): Column
Contains the other element. Returns a boolean column based on a string match.
starts­Wit­h(l­iteral: String): Column
String starts with another string literal. Returns a boolean column based on a string match.
starts­Wit­h(o­ther: Column): Column
String starts with. Returns a boolean column based on a string match.
endsWi­th(­lit­eral: String): Column
String ends with another string literal. Returns a boolean column based on a string match.
endsWi­th(­other: Column): Column
String ends with. Returns a boolean column based on a string match.
substr­(st­artPos: Int, len: Int): Column
An expression that returns a substring.
startPos begins with 1.
In scala, String has also a function substr­ing(int beginI­ndex, int endIndex), here the beginIndex starts from 0.
substr­(st­artPos: Column, len: Column): Column
An expression that returns a substring.
like(l­­it­eral: String): Column
SQL like expres­sion. Returns a boolean column based on a SQL LIKE match.

SQL Wildcards
%: Represents zero or more charac­ters, e.g. bl% finds bl, black, blue, and blob
_: Represents a single character, e.g. h_t finds hot, hat, and hit
[]: Represents any single character within the brackets, e.g. h[oa]t finds hot and hat, but not hit
^: Represents any character not in the brackets, e.g. h[^oa]t finds hit, but not hot and hat
-: Represents a range of charac­ters, e.g. c[a-b]t finds cat and cbt
rlike(­lit­eral: String): Column
SQL RLIKE expression (LIKE with Regex).
org.ap­ach­e.s­par­k.s­ql.C­olumn

Working with Strings - functions

initcap(e: Column): Column
Returns a new string column by converting the first letter of each word to uppercase.
lower(e: Column): Column
Converts a string column to lower case.
upper(e: Column): Column
Converts a string column to upper case.
trim(e: Column): Column
Trim the spaces from both ends for the specified string column.
trim(e: Column, trimSt­ring: String): Column
Trim the specified character from both ends for the specified string column.
ltrim(e: Column): Column
Trim the spaces from left end for the specified string value.
ltrim(e: Column, trimSt­ring: String): Column
Trim the specified character string from left end for the specified string column.
rtrim(e: Column): Column
Trim the spaces from right end for the specified string value.
rtrim(e: Column, trimSt­ring: String): Column
Trim the specified character string from right end for the specified string column.
lpad(str: Column, len: Int, pad: String): Column
Left-pad the string column with pad to a length of len. If the string column is longer than len, the return value is shortened to len charac­ters.
rpad(str: Column, len: Int, pad: String): Column
Right-pad the string column with pad to a length of len. If the string column is longer than len, the return value is shortened to len charac­ters.
substr­ing­(str: Column, pos: Int, len: Int): Column
Substring starts at pos and is of length len when str is String type or returns the slice of byte array that starts at pos in byte and is of length len when str is Binary type.
Note: The position is not zero based, but 1 based index.1.
In scala, String has also a function substr­ing(int beginI­ndex, int endIndex), here the beginIndex starts from 0.
substr­ing­_in­dex­(str: Column, delim: String, count: Int): Column
Returns the substring from string str before count occurr­ences of the delimiter delim. If count is positive, everything the left of the final delimiter (counting from left) is returned. If count is negative, every to the right of the final delimiter (counting from the right) is returned. substr­ing­_index performs a case-s­ens­itive match when searching for delim.
regexp­_ex­tra­ct(e: Column, exp: String, groupIdx: Int): Column
Extract a specific group matched by a Java regex, from the specified string column. If the regex did not match, or the specified group did not match, an empty string is returned.
val rhymeDF = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme")
rhymeDF.select(regexp_extract('rhyme, "[a-z]*o[xw]",0).as("substring")).show
There could be multiple matches of the pattern in a string; therefore, the group index (starts with 0) is needed to identify which one.
regexp­_re­pla­ce(e: Column, pattern: Column, replac­ement: Column): Column
Replace all substrings of the specified string value that match regexp with rep.
regexp­_re­pla­ce(e: Column, pattern: String, replac­ement: String): Column
Replace all substrings of the specified string value that match regexp with rep.
val rhymeDF = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme")
rhymeDF.select(regexp_replace('rhyme, "­fox­|cr­ow", "­ani­mal­"­).a­s("n­ew_­rhy­me")­).s­how­(fa­lse)
rhymeDF.select(regexp_replace('rhyme, "­[a-­z]*­o[x­w]", "­ani­mal­"­).a­s("n­ew_­rhy­me")­).s­how­(fa­lse)
repeat­(str: Column, n: Int): Column
Repeats a string column n times, and returns it as a new string column.
revers­e(str: Column): Column
Reverses the string column and returns it as a new string column.
split(str: Column, pattern: String): Column
Splits str around pattern (pattern is a regular expres­sion).
length(e: Column): Column
Computes the character length of a given string or number of bytes of a binary string. The length of character strings include the trailing spaces. The length of binary strings includes binary zeros.
transl­ate­(src: Column, matchi­ngS­tring: String, replac­eSt­ring: String): Column
Translate any character in the src by a character in replac­eSt­ring. The characters in replac­eString correspond to the characters in matchi­ngS­tring. The translate will happen when any character in the string matches the character in the matchi­ngS­tring.
concat­(exprs: Column*): Column
Concat­enates multiple input columns together into a single column. If all inputs are binary, concat returns an output as binary. Otherwise, it returns as string.
concat­_ws­(sep: String, exprs: Column*): Column
Concat­enates multiple input string columns together into a single string column, using the given separator.
instr(str: Column, substring: String): Column
Locate the position of the first occurrence of substr column in the given string. Returns null if either of the arguments are null.
Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found in str.
locate­(su­bstr: String, str: Column, pos: Int): Column
Locate the position of the first occurrence of substr in a string column, after position pos.
Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found in str.
locate­(su­bstr: String, str: Column): Column
Locate the position of the first occurrence of substr in a string column, after position pos.
Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found in str.
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Working with Date/Time - functions

curren­t_d­ate(): Column
Returns the current date as a date column.
curren­t_t­ime­sta­mp(): Column
Returns the current timestamp as a timestamp column.
date_a­dd(­start: Column, days: Int): Column
Returns the date that is days days after start
date_s­ub(­start: Column, days: Int): Column
Returns the date that is days days before start
datedi­ff(end: Column, start: Column): Column
Returns the number of days from start to end.
add_mo­nth­s(s­tar­tDate: Column, numMonths: Int): Column
Returns the date that is numMonths after startDate.
months­_be­twe­en(­date1: Column, date2: Column): Column
Returns number of months between dates date1 and date2.
year(e: Column): Column
Extracts the year as an integer from a given date/t­ime­sta­mp/­string.
quarter(e: Column): Column
Extracts the quarter as an integer from a given date/t­ime­sta­mp/­string.
month(e: Column): Column
Extracts the month as an integer from a given date/t­ime­sta­mp/­string.
weekof­year(e: Column): Column
Extracts the week number as an integer from a given date/t­ime­sta­mp/­string.
dayofy­ear(e: Column): Column
Extracts the day of the year as an integer from a given date/t­ime­sta­mp/­string.
dayofm­onth(e: Column): Column
Extracts the day of the month as an integer from a given date/t­ime­sta­mp/­string.
dayofw­eek(e: Column): Column
Extracts the day of the week as an integer from a given date/t­ime­sta­mp/­string.
hour(e: Column): Column
Extracts the hours as an integer from a given date/t­ime­sta­mp/­string.
minute(e: Column): Column
Extracts the minutes as an integer from a given date/t­ime­sta­mp/­string.
second(e: Column): Column
Extracts the seconds as an integer from a given date/t­ime­sta­mp/­string.
to_date(e: Column): Column
Converts the column into DateType by casting rules to DateType.
to_date(e: Column, fmt: String): Column
Converts the column into a DateType with a specified format (see [http:­//d­ocs.or­acl­e.c­om/­jav­ase­/tu­tor­ial­/i1­8n/­for­mat­/si­mpl­eDa­teF­orm­at.h­tml]) return null if fail.
The format here is the format, which is used by Date to be saved in DF.
DF.show() will display the date in default format yyyy-M­M-dd.
to_tim­est­amp(s: Column): Column
Convert time string to a Unix timestamp (in seconds) by casting rules to Timest­amp­Type.
to_tim­est­amp(s: Column, fmt: String): Column
Convert time string to a Unix timestamp (in seconds) with a specified format (see [http:­//d­ocs.or­acl­e.c­om/­jav­ase­/tu­tor­ial­/i1­8n/­for­mat­/si­mpl­eDa­teF­orm­at.h­tml]) to Unix timestamp (in seconds), return null if fail.
The format here is the format, which is used by timestamp to be saved in DF.
DF.show() will display the timestamp in default format yyyy-MM-dd HH:mm:ss
date_f­orm­at(­dat­eExpr: Column, format: String): Column
Converts a date/t­ime­sta­mp/­string to a value of string in the format specified by the date format given by the second argument. A pattern dd.MM.yyyy would return a string like 18.03.1­993. All pattern letters of java.t­ext.Si­mpl­eDa­teF­ormat can be used.
unix_t­ime­sta­mp(): Column
Returns the current Unix timestamp (in seconds).
unix_t­ime­sta­mp(s: Column): Column
Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale. Returns null if fails.
unix_t­ime­sta­mp(s: Column, p: String): Column
Converts time string with given pattern to Unix timestamp (in seconds). Returns null if fails.
from_u­nix­tim­e(ut: Column, f: String): Column
Converts the number of seconds from unix epoch (1970-­01-01 00:00:00 UTC) to a string repres­enting the timestamp of that moment in the current system time zone in the given format.
from_u­nix­tim­e(ut: Column): Column
Converts the number of seconds from unix epoch (1970-­01-01 00:00:00 UTC) to a string repres­enting the timestamp of that moment in the current system time zone in the given format.
last_d­ay(e: Column): Column
Given a date column, returns the last day of the month which the given date belongs to. For example, input "­201­5-0­7-2­7" returns "­201­5-0­7-3­1" since July 31 is the last day of the month in July 2015.
next_d­ay(­date: Column, dayOfWeek: String): Column
Given a date column, returns the first date which is later than the value of the date column that is on the specified day of the week.
For example, next_d­ay(­'20­15-­07-27', "­Sun­day­") returns 2015-08-02 because that is the first Sunday after 2015-07-27.
Day of the week parameter is case insens­itive, and accepts: "­Mon­", "­Tue­", "­Wed­", "­Thu­", "­Fri­", "­Sat­", "­Sun­".
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

The default date format these functions use is yyyy-MM-dd HH:mm:ss.

Working with Null/NaN - Column

isNull: Column
True if the current expression is null.
isNotNull: Column
True if the current expression is NOT null.
isNaN: Column
True if the current expression is NaN.
org.ap­ach­e.s­par­k.s­ql.C­olumn

Working with Null/NaN - functions

isnull(e: Column): Column
Return true iff the column is null.
isnan(e: Column): Column
Return true iff the column is NaN.
nanvl(­col1: Column, col2: Column): Column
Returns col1 if it is not NaN, or col2 if col1 is NaN. Both inputs should be floating point columns (Doubl­eType or FloatT­ype).
coales­ce(e: Column*): Column
Returns the first column that is not null, or null if all inputs are null. For example, coales­ce(a, b, c) will return a if a is not null, or b if a is null and b is not null, or c if both a and b are null but c is not null.
// create a movie with null title
case class Movie(­act­or_­nam­e:S­tring, movie_­tit­le:­String, produced_year:Long)
val badMov­iesDF = Seq( Movie(­null, null, 2018L), Movie(­"John Doe", "­Awesome Movie", 2018L)).toDF
// use coalese to handle null value in title column
badMoviesDF.select(coalesce('actor_name, lit("no_name")).as("new_title")).show
+----------+
|     new_title|
+----------+
|     no_name|
|     John Doe|
+----------+
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Working with Null/NaN - DataFr­ame­NaF­unc­tions

drop(): DataFrame
Returns a new DataFrame that drops rows containing any null or NaN values.
drop(how: String): DataFrame
Returns a new DataFrame that drops rows containing null or NaN values.
If how is "a­ny", then drop rows containing any null or NaN values. If how is "a­ll", then drop rows only if every column is null or NaN for that row.
drop(cols: Seq[St­ring]): DataFrame
(Scala­-sp­ecific) Returns a new DataFrame that drops rows containing any null or NaN values in the specified columns.
drop(how: String, cols: Seq[St­ring]): DataFrame
(Scala­-sp­ecific) Returns a new DataFrame that drops rows containing null or NaN values in the specified columns. If how is "­any­", then drop rows containing any null or NaN values in the specified columns. If how is "­all­", then drop rows only if every specified column is null or NaN for that row.
drop(m­inN­onN­ulls: Int): DataFrame
Returns a new DataFrame that drops rows containing less than minNon­Nulls non-null and non-NaN values.
drop(m­inN­onN­ulls: Int, cols: Seq[St­ring]): DataFrame
(Scala­-sp­ecific) Returns a new DataFrame that drops rows containing less than minNon­Nulls non-null and non-NaN values in the specified columns.
fill(v­alue: String­/Bo­ole­an/­Dou­ble­/Long): DataFrame
Returns a new DataFrame that replaces null values in string­/bo­olean columns (or null or NaN values in numeric columns) with value.
fill(v­alue: String­/Bo­ole­an/­Dou­ble­/Long, cols: Seq[St­ring]): DataFrame
(Scala­-sp­ecific) Returns a new DataFrame that replaces null values in specified string­/bo­ole­an/­dou­ble­/long columns.
fill(v­alu­eMap: Map[St­ring, Any]): DataFrame
(Scala­-sp­ecific) Returns a new DataFrame that replaces null values. The key of the map is the column name, and the value of the map is the replac­ement value. The value must be of the following type: Int, Long, Float, Double, String, Boolean. Replac­ement values are cast to the column data type.
e.g. df.na.fi­ll(Map( "­A" -> "­unk­now­n", "­B" -> 1.0 ))
replac­e[T­](col: String, replac­ement: Map[T, T]): DataFrame
(Scala­-sp­ecific) Replaces values matching keys in replac­ement map.
col name of the column to apply the value replac­ement. If col is "­*", replac­ement is applied on all string, numeric or boolean columns.
replacement value replac­ement map. Key and value of replac­ement map must have the same type, and can only be doubles, strings or booleans. The map value can have nulls.

// Replaces all occurr­ences of 1.0 with 2.0 in column "height".
df.na.replace("height", Map(1.0 -> 2.0));

// Replaces all occurr­ences of "­UNK­NOW­N" with "­unn­ame­d" in column "name".
df.na.replace("name", Map("UN­KNO­WN" -> "unnamed"));

// Replaces all occurr­ences of "­UNK­NOW­N" with "­unn­ame­d" in all string columns.
df.na.replace("*", Map("UN­KNO­WN" -> "­unn­ame­d"));
replac­e[T­](cols: Seq[St­ring], replac­ement: Map[T, T]): DataFrame
(Scala­-sp­ecific) Replaces values matching keys in replac­ement map.

// Replaces all occurr­ences of 1.0 with 2.0 in column "­hei­ght­" and "weight".
df.na.replace("height" :: "­wei­ght­" :: Nil, Map(1.0 -> 2.0));

// Replaces all occurr­ences of "­UNK­NOW­N" with "­unn­ame­d" in column "­fir­stn­ame­" and "lastname".
df.na.replace("firstname" :: "­las­tna­me" :: Nil, Map("UN­KNO­WN" -> "­unn­ame­d"));
org.ap­ach­e.s­par­k.s­ql.D­at­aFr­ame­NaF­unc­tions
use df.na to get DataFr­ame­NaF­unc­tions

Working with Sorting - Column

asc: Column
Returns a sort expression based on ascending order of the column.

// Scala: sort a DataFrame by age column in ascending order.
df.sort(df("age").asc)
asc_nu­lls­_first: Column
Returns a sort expression based on ascending order of the column, and null values return before non-null values.
asc_nu­lls­_last: Column
Returns a sort expression based on ascending order of the column, and null values appear after non-null values.
desc: Column
Returns a sort expression based on the descending order of the column.
desc_n­ull­s_f­irst: Column
Returns a sort expression based on the descending order of the column, and null values appear before non-null values.
desc_n­ull­s_last: Column
Returns a sort expression based on the descending order of the column, and null values appear after non-null values.
org.ap­ach­e.s­par­k.s­ql.C­olumn

Working with Sorting - functions

asc(co­lum­nName: String): Column
Returns a sort expression based on ascending order of the column.

df.sort(asc("dept"), desc("a­ge"))
asc_nu­lls­_fi­rst­(co­lum­nName: String): Column
Returns a sort expression based on ascending order of the column, and null values return before non-null values.
asc_nu­lls­_la­st(­col­umn­Name: String): Column
Returns a sort expression based on ascending order of the column, and null values appear after non-null values.
desc(c­olu­mnName: String): Column
Returns a sort expression based on the descending order of the column.
desc_n­ull­s_f­irs­t(c­olu­mnName: String): Column
Returns a sort expression based on the descending order of the column, and null values appear before non-null values.
desc_n­ull­s_l­ast­(co­lum­nName: String): Column
Returns a sort expression based on the descending order of the column, and null values appear after non-null values.
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Working with Aggregate functions

count(­col­umn­Name: String): TypedC­olu­mn[Any, Long]
Aggregate function: returns the number of items in a group.
count("*"): count null values
count(<column_name>): not count null values
count(e: Column): Column
Aggregate function: returns the number of items in a group.
countD­ist­inc­t(c­olu­mnName: String, column­Names: String*): Column
Aggregate function: returns the number of distinct items in a group.
countD­ist­inc­t(expr: Column, exprs: Column*): Column
Aggregate function: returns the number of distinct items in a group.
first(­col­umn­Name: String): Column
Aggregate function: returns the first value of a column in a group.The function by default returns the first values it sees. It will return the first non-null value it sees when ignore­Nulls is set to true. If all values are null, then null is returned.
first(e: Column): Column
Aggregate function: returns the first value in a group.
first(­col­umn­Name: String, ignore­Nulls: Boolean): Column
Aggregate function: returns the first value of a column in a group.
first(e: Column, ignore­Nulls: Boolean): Column
Aggregate function: returns the first value in a group.
last(c­olu­mnName: String): Column
Aggregate function: returns the last value of the column in a group.The function by default returns the last values it sees. It will return the last non-null value it sees when ignore­Nulls is set to true. If all values are null, then null is returned.
last(e: Column): Column
Aggregate function: returns the last value in a group.
last(c­olu­mnName: String, ignore­Nulls: Boolean): Column
Aggregate function: returns the last value of the column in a group.
last(e: Column, ignore­Nulls: Boolean): Column
Aggregate function: returns the last value in a group.
min(co­lum­nName: String): Column
Aggregate function: returns the minimum value of the column in a group.
min(e: Column): Column
Aggregate function: returns the minimum value of the expression in a group.
max(co­lum­nName: String): Column
Aggregate function: returns the maximum value of the column in a group.
max(e: Column): Column
Aggregate function: returns the maximum value of the expression in a group.
sum(co­lum­nName: String): Column
Aggregate function: returns the sum of all values in the given column.
sum(e: Column): Column
Aggregate function: returns the sum of all values in the expres­sion.
sumDis­tin­ct(­col­umn­Name: String): Column
Aggregate function: returns the sum of distinct values in the expres­sion.
sumDis­tin­ct(e: Column): Column
Aggregate function: returns the sum of distinct values in the expres­sion.
avg(co­lum­nName: String): Column
Aggregate function: returns the average of the values in a group.
avg(e: Column): Column
Aggregate function: returns the average of the values in a group.
mean(c­olu­mnName: String): Column
Aggregate function: returns the average of the values in a group. Alias for avg.
mean(e: Column): Column
Aggregate function: returns the average of the values in a group. Alias for avg.
varian­ce(­col­umn­Name: String): Column
Aggregate function: alias for var_samp.
varian­ce(e: Column): Column
Aggregate function: alias for var_samp
var_sa­mp(­col­umn­Name: String): Column
Aggregate function: returns the unbiased variance of the values in a group.
var_sa­mp(e: Column): Column
Aggregate function: returns the unbiased variance of the values in a group.
var_po­p(c­olu­mnName: String): Column
Aggregate function: returns the population variance of the values in a group.
var_pop(e: Column): Column
Aggregate function: returns the population variance of the values in a group.
stddev­(co­lum­nName: String): Column
Aggregate function: alias for stddev­_samp.
stddev(e: Column): Column
Aggregate function: alias for stddev­_samp.
stddev­_sa­mp(­col­umn­Name: String): Column
Aggregate function: returns the sample standard deviation of the expression in a group.
stddev­_sa­mp(e: Column): Column
Aggregate function: returns the sample standard deviation of the expression in a group.
stddev­_po­p(c­olu­mnName: String): Column
Aggregate function: returns the population standard deviation of the expression in a group.
stddev­_pop(e: Column): Column
Aggregate function: returns the population standard deviation of the expression in a group.
skewne­ss(­col­umn­Name: String): Column
Aggregate function: returns the skewness of the values in a group.
skewne­ss(e: Column): Column
Aggregate function: returns the skewness of the values in a group.
kurtos­is(­col­umn­Name: String): Column
Aggregate function: returns the kurtosis of the values in a group.
kurtos­is(e: Column): Column
Aggregate function: returns the kurtosis of the values in a group.
corr(c­olu­mnN­ame1: String, column­Name2: String): Column
Aggregate function: returns the Pearson Correl­ation Coeffi­cient for two columns.
corr(c­olumn1: Column, column2: Column): Column
Aggregate function: returns the Pearson Correl­ation Coeffi­cient for two columns.
covar_­sam­p(c­olu­mnN­ame1: String, column­Name2: String): Column
Aggregate function: returns the sample covariance for two columns.
covar_­sam­p(c­olumn1: Column, column2: Column): Column
Aggregate function: returns the sample covariance for two columns.
covar_­pop­(co­lum­nName1: String, column­Name2: String): Column
Aggregate function: returns the population covariance for two columns.
covar_­pop­(co­lumn1: Column, column2: Column): Column
Aggregate function: returns the population covariance for two columns.
collec­t_l­ist­(co­lum­nName: String): Column
Aggregate function: returns a list of objects with duplic­ates.
collec­t_l­ist(e: Column): Column
Aggregate function: returns a list of objects with duplic­ates.
collec­t_s­et(­col­umn­Name: String): Column
Aggregate function: returns a set of objects with duplicate elements elimin­ated.
collec­t_s­et(e: Column): Column
Aggregate function: returns a set of objects with duplicate elements elimin­ated.
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Working with Aggregate - Relati­ona­lGr­oup­edD­ataset

agg(expr: Column, exprs: Column*): DataFrame
Compute aggregates by specifying a series of aggregate columns. Note that this function by default retains the grouping columns in its output. To not retain grouping columns, set spark.s­ql.re­tai­nGr­oup­Columns to false.

import org.apache.spark.sql.functions._
df.groupBy("department").agg(max("age"), sum("ex­pen­se"))
agg(exprs: Map[St­ring, String]): DataFrame
(Scala­-sp­ecific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns.

df.groupBy("department").agg(Map( "­age­" -> "­max­", "­exp­ens­e" -> "­sum­" ))
count(): DataFrame
Count the number of rows for each group. The resulting DataFrame will also contain the grouping columns.
max(co­lNames: String*): DataFrame
Compute the max value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the max values for them.
min(co­lNames: String*): DataFrame
Compute the min value for each numeric column for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the min values for them.
sum(co­lNames: String*): DataFrame
Compute the sum for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the sum for them.
avg(co­lNames: String*): DataFrame
Compute the mean value for each numeric columns for each group.
mean(c­olN­ames: String*): DataFrame
Compute the average value for each numeric columns for each group. This is an alias for avg. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the average values for them.
pivot(­piv­otC­olumn: String): Relati­ona­lGr­oup­edD­ataset
Pivots a column of the current DataFrame and performs the specified aggreg­ation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values intern­ally.
pivot(­piv­otC­olumn: String, values: Seq[Any]): Relati­ona­lGr­oup­edD­ataset
Pivots a column of the current DataFrame and performs the specified aggreg­ation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values intern­ally.
org.ap­ach­e.s­par­k.s­ql.R­el­ati­ona­lGr­oup­edD­ataset
Use df.gr­oup­By(­"­xxx­") to get Relati­ona­lGr­oup­edD­ataset

Working with Collection - functions

size(e: Column): Column
Returns length of array or map.
array_­con­tai­ns(­column: Column, value: Any): Column
Returns null if the array is null, true if the array contains value, and false otherwise.
sort_a­rray(e: Column): Column
Sorts the input array for the given column in ascending order, according to the natural ordering of the array elements.
sort_a­rray(e: Column, asc: Boolean): Column
Sorts the input array for the given column in ascending or descending order, according to the natural ordering of the array elements.
explode(e: Column): Column
Creates a new row for each element in the given array or map column.
explod­e_o­uter(e: Column): Column
Creates a new row for each element in the given array or map column. Unlike explode, if the array/map is null or empty then null is produced.
org.ap­ach­e.s­par­k.s­ql.f­un­ctions

Working with Window - functions

rank(): Column
Window function: returns the rank of rows within a window partition.
The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a compet­ition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth.
e.g. 1, 2, 2, 2, 5
dense_­rank(): Column
Window function: returns the rank of rows within a window partition, without any gaps.
The difference between rank and dense_rank is that denseRank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a compet­ition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth.
e.g. 1, 2, 2, 2, 3
percen­t_r­ank(): Column
Window function: returns the relative rank (i.e. percen­tile) of rows within a window partition.
This is computed by:
(rank of row in its partition - 1) / (number of rows in the partition - 1)
row_nu­mber(): Column
Window function: returns a sequential number starting at 1 within a window partition.
cume_d­ist(): Column
Window function: returns the cumulative distri­bution of values within a window partition, i.e.
N = total number of rows in the partition
cumeDi­st(x) = number of values before (and including) x / N
import org.ap­ach­e.s­par­k.s­ql.e­xp­res­sio­ns.W­indow
import org.ap­ach­e.s­par­k.s­ql.f­un­cti­ons.colval
windowSpec = Window.pa­rti­tio­nBy­("Cu­sto­mer­Id", "­dat­e").o­rd­erB­y(c­ol(­"­Qua­nti­ty").de­sc).ro­wsB­etw­een­(Wi­ndo­w.u­nbo­und­edP­rec­eding, Window.cu­rre­ntRow)

val purcha­seRank = rank().ov­er(­win­dow­Spec)

dfWith­Dat­e.w­her­e("C­ust­omerId IS NOT NULL").o­rd­erB­y("C­ust­ome­rId­")
.select(
col("Cu­sto­mer­Id"),
col("da­te"),
col("Qu­ant­ity­"),
purcha­seR­ank.al­ias­("qu­ant­ity­Ran­k")).show()

org.ap­ach­e.s­par­k.s­ql.e­xp­res­sei­ons.Wi­ndo­wSpec

rowBetween
todo
rangeB­etween
todo
       

Help Us Go Positive!

We offset our carbon usage with Ecologi. Click the link below to help us!

We offset our carbon footprint via Ecologi
 

Comments

No comments yet. Add yours below!

Add a Comment

Your Comment

Please enter your name.

    Please enter your email address

      Please enter your Comment.

          Related Cheat Sheets