Data Sources - read
format |
"csv", "text", "json", "parquet" (default), "orc", "jdbc" |
option |
csv |
sep (default ,): sets a single character as a separator for each field and value. quote (default "): sets a single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not null but an empty string. This behaviour is different from com.databricks.spark.csv. escape (default \): sets a single character used for escaping quotes inside an already quoted value. charToEscapeQuoteEscaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise. comment (default empty string): sets a single character used for skipping lines beginning with this character. By default, it is disabled. header (default false): uses the first line as names of columns. inferSchema (default false): infers the input schema automatically from data. It requires one extra pass over the data. mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing. It supports the following case-insensitive modes. PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a field configured by columnNameOfCorruptRecord. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When a length of parsed CSV tokens is shorter than an expected length of a schema, it sets null for extra fields. DROPMALFORMED : ignores the whole corrupted records. FAILFAST : throws an exception when it meets corrupted records. nullValue (default empty string): sets the string representation of a null value. Since 2.0.1, this applies to all supported types including the string type. nanValue (default NaN): sets the string representation of a non-number" value. dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. maxColumns (default 20480): defines a hard limit of how many columns a record can have. maxCharsPerColumn (default -1): defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited length multiLine (default false): parse one record, which may span multiple lines. encoding (default UTF-8): decodes the CSV files by the given encoding type. ignoreLeadingWhiteSpace (default false): a flag indicating whether or not leading whitespaces from values being read should be skipped. ignoreTrailingWhiteSpace (default false): a flag indicating whether or not trailing whitespaces from values being read should be skipped. positiveInf (default Inf): sets the string representation of a positive infinity value. negativeInf (default -Inf): sets the string representation of a negative infinity value. columnNameOfCorruptRecord (default is the value specified in spark.sql.columnNameOfCorruptRecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord. |
|
text |
wholetext( default false) |
|
json |
mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing. PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a field configured by columnNameOfCorruptRecord. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema. DROPMALFORMED : ignores the whole corrupted records. FAILFAST : throws an exception when it meets corrupted records. columnNameOfCorruptRecord (default is the value specified in spark.sql.columnNameOfCorruptRecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord. dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. multiLine (default false): parse one record, which may span multiple lines, per file primitivesAsString (default false): infers all primitive values as a string type prefersDecimal (default false): infers all floating-point values as a decimal type. If the values do not fit in decimal, then it infers them as doubles. allowComments (default false): ignores Java/C++ style comment in JSON records allowUnquotedFieldNames (default false): allows unquoted JSON field names allowSingleQuotes (default true): allows single quotes in addition to double quotes allowNumericLeadingZeros (default false): allows leading zeros in numbers (e.g. 00012) allowBackslashEscapingAnyCharacter (default false): allows accepting quoting of all character using backslash quoting mechanism allowUnquotedControlChars (default false): allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. |
|
parquet |
mergeSchema (default is the value specified in spark.sql.parquet.mergeSchema): sets whether we should merge schemas collected from all Parquet part-files. This will override spark.sql.parquet.mergeSchema. |
|
orc |
|
jdbc |
url: The JDBC URL for Spark to connect to. At the minimum, it should contain the host, port, and database name. For MySQL, it may look something like this: jdbc:mysql://localhost:3306/sakila. dbtable: The name of a database table for Spark to read data from or write data to. user password driver: The class name of the JDBC driver that Spark will instantiate to connect to the previous URL. Consult the JDBC driver documentation that you are using. For the MySQL Connector/J driver, the class name is com.mysql.jdbc.Driver. |
schema |
can use """...""" define he schema, need use the scala data type. e.g. schema("""stockticker STRING, tradedate INT, openprice FLOAT""")
// Mode 1 val movieSchema = StructType(Array(StructField("stockticker", StringType, true), StructField("tradedate", IntegerType, true), StructField("openprice", FloatType, true))) // Mode 2: equivalent to mode 1 val movieSchema = """stockticker STRING, tradedate INT, openprice FLOAT"""
|
DataFrameReader.format(...).option("key", "value").schema(...).load(paths: String*)
can give multiple paths, can give directory path to read all files in the directory, can use wildcard "*" in the path
To get a DataFrameReader, use spark.read
Two ways to define Schema
Define a schema programmatically: |
val schema = StructType(Array(StructField("author", StringType, false), StructField("title", StringType, false), StructField("pages", IntegerType, false)))
|
Define a schema with a DDL String |
val schema = “author STRING, title STRING, pages INT”
|
Data Source - write
format |
"csv", "text", "json", "parquet" (default), "orc", "jdbc" |
mode |
"overwrite", "append", "ignore", "error/errorIfExists" (default) |
option |
csv |
sep (default ,): sets a single character as a separator for each field and value. quote (default "): sets a single character used for escaping quoted values where the separator can be part of the value. If an empty string is set, it uses u0000 (null character). escape (default \): sets a single character used for escaping quotes inside an already quoted value. charToEscapeQuoteEscaping (default escape or \0): sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise. escapeQuotes (default true): a flag indicating whether values containing quotes should always be enclosed in quotes. Default is to escape all values containing a quote character. quoteAll (default false): a flag indicating whether all values should always be enclosed in quotes. Default is to only escape values containing a quote character. header (default false): writes the names of columns as the first line. nullValue (default empty string): sets the string representation of a null value. compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. ignoreLeadingWhiteSpace (default true): a flag indicating whether or not leading whitespaces from values being written should be skipped. ignoreTrailingWhiteSpace (default true): a flag indicating defines whether or not trailing whitespaces from values being written should be skipped. |
|
text |
compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). |
|
json |
compression (default null): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and deflate). dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. |
|
parquet |
compression (default is the value specified in spark.sql.parquet.compression.codec): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names(none, snappy, gzip, and lzo). This will override spark.sql.parquet.compression.codec. |
|
orc |
compression (default is the value specified in spark.sql.orc.compression.codec): compression codec to use when saving to file. This can be one of the known case-insensitive shorten names(none, snappy, zlib, and lzo). This will override orc.compress and spark.sql.orc.compression.codec. If orc.compress is given, it overrides spark.sql.orc.compression.codec. |
|
jdbc |
truncate (default false): use TRUNCATE TABLE instead of DROP TABLE. In case of failures, users should turn off truncate option to use DROP TABLE again. Also, due to the different behavior of TRUNCATE TABLE among DBMS, it's not always safe to use this. MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect, the user option truncate is ignored. |
saveAsTable(tableName: String): Unit |
Saves the content of the DataFrame as the specified table. In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception). When mode is Overwrite, the schema of the DataFrame does not need to be the same as that of the existing table. When mode is Append, if there is an existing table, we will use the format and options of the existing table. The column order in the schema of the DataFrame doesn't need to be same as that of the existing table. Unlike insertInto, saveAsTable will use the column names to find the correct column positions. For example: scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") scala> Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("t1") scala> sql("select * from t1").show +---+---+ | i| j| +---+---+ | 1| 2| | 4| 3| +---+---+ In this method, save mode is used to determine the behavior if the data source table exists in Spark catalog. We will always overwrite the underlying data of data source (e.g. a table in JDBC data source) if the table doesn't exist in Spark catalog, and will always append to the underlying data of data source if the table already exists. When the DataFrame is created from a non-partitioned HadoopFsRelation with a single input path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC and Parquet), the table is persisted in a Hive compatible format, which means other systems like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL specific format. |
insertInto(tableName: String): Unit |
Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table. Unlike saveAsTable, insertInto ignores the column names and just uses position-based resolution. For example: scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1") scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1") scala> sql("select * from t1").show +---+---+ | i| j| +---+---+ | 5| 6| | 3| 4| | 1| 2| +---+---+ Because it inserts data to an existing table, format or options will be ignored. |
DataFrameWriter.format(...).mode(...).option(...).partitionBy(colNames: String).bucketBy(numBuckets: Int, colName: String, colNames: String).sortBy(colName: String, colNames: String*).save(path: String)
DataFrameWriter.format(...).mode(...).option(...).partitionBy(colNames: String).bucketBy(numBuckets: Int, colName: String, colNames: String).sortBy(colName: String, colNames: String*).saveAsTable/insertInto(tableName: String)
To get DataFrameWriter, use dataFrame.write
Data Type
Spark |
Scala |
Java |
ByteType |
Byte |
byte or Byte |
ShortType |
Short |
short or Short |
IntegerType |
Int |
int or Integer |
LongType |
Long |
long or Long |
FloatType |
Float |
float or Float |
DoubleType |
Double |
double or Double |
DecimalType |
java.math.BigDecimal |
java.,math.BigDecimal |
StringType |
String |
String |
BinaryType |
Array[Byte] |
byte[] |
BooleanType |
Boolean |
boolean or Boolean |
DateType |
java.sql.Date |
java.sql.Date |
TimestampType |
java.sql.Timestamp |
java.sql.Timestamp |
ArrayType |
scala.collection.Seq |
java.util.List |
MapType |
scala.collection.Map |
java.util.Map |
StructType |
org.apache.spark.sql.Row |
org.apache.spark.sql.Row |
StructField |
Expressions
Computational expressions |
(((col("someCol") + 5) * 200) - 6) < col("otherCol")
|
Relational expressions |
expr("(((someCol + 5) * 200) - 6) < otherCol")
|
An expression is a set of transformations on one or more values in a record in a DataFrame. Think of it like a function that takes as input one or more column names, resolves them, and then potentially applies more expressions to create a single value for each record in the dataset. Importantly, this “single value” can actually be a complex type like a Map or Array.
Converting to Spark Types - functions
lit(literal: Any): Column |
Creates a Column of literal value. The passed in object is returned directly if it is already a Column. If the object is a Scala Symbol, it is converted into a Column also. Otherwise, a new Column is created to represent the literal value. |
org.apache.spark.sql.functions
Change the Column Data Type - Column
cast(to: String): Column |
Casts the column to a different data type, using the canonical string representation of the type. The supported types are: string, boolean, byte, short, int, long, float, double, decimal, date, timestamp. // Casts colA to integer. df.select(col("colA").cast("int"))
|
cast(to: DataType): Column |
Casts the column to a different data type. // Casts colA to IntegerType. import org.apache.spark.sql.types.IntegerType df.select(col("colA").cast(IntegerType)) // equivalent to df.select(df("colA").cast("int"))
|
e.g. df.withColumn("id", col("id").cast("string"))
org.apache.spark.sql.Dataset - others
first(): T |
Returns the first row. Alias for head(). |
head(): T |
Returns the first row. |
head(n: Int): Array[T] |
Returns the first n rows. |
take(n: Int): Array[T] |
Returns the first n rows in the Dataset. |
takeAsList(n: Int): List[T] |
Returns the first n rows in the Dataset as a list. |
collect(): Array[T] |
Returns an array that contains all rows in this Dataset. |
collectAsList(): List[T] |
Returns a Java list that contains all rows in this Dataset. |
count(): Long |
Returns the number of rows in the Dataset. |
show(): Unit |
Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters will be truncated, and all cells will be aligned right. |
show(numRows: Int): Unit |
Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated, and all cells will be aligned right. |
show(truncate: Boolean): Unit |
Displays the top 20 rows of Dataset in a tabular form. |
show(numRows: Int, truncate: Boolean): Unit |
Displays the Dataset in a tabular form. |
printSchema(): Unit |
Prints the schema to the console in a nice tree format. |
explain(): Unit |
Prints the physical plan to the console for debugging purposes. |
explain(extended: Boolean): Unit |
Prints the plans (logical and physical) to the console for debugging purposes. |
schema: StructType |
Returns the schema of this Dataset. |
columns: Array[String] |
Returns all column names as an array. |
describe(cols: String*): DataFrame |
Computes basic statistics for numeric and string columns, including count, mean, stddev, min, and max. ds.describe("age", "height").show()
// output:
// summary age height
// count 10.0 10.0
// mean 53.3 178.05
// stddev 11.6 15.7
// min 18.0 163.0
// max 92.0 192.0
|
summary(statistics: String*): DataFrame |
Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg, 75%) ds.summary().show() // output: // summary age height // count 10.0 10.0 // mean 53.3 178.05 // stddev 11.6 15.7 // min 18.0 163.0 // 25% 24.0 176.0 // 50% 24.0 176.0 // 75% 32.0 180.0 // max 92.0 192.0 ds.summary("count", "min", "25%", "75%", "max").show() // output: // summary age height // count 10.0 10.0 // mean 53.3 178.05 // min 18.0 163.0 // 25% 24.0 176.0 // 75% 32.0 180.0 // max 92.0 192.0 To do a summary for specific columns first select them: ds.select("age", "height").summary().show()
|
cache(): Dataset.this.type |
Persist this Dataset with the default storage level (MEMORY_AND_DISK). |
persist(): Dataset.this.type |
Persist this Dataset with the default storage level (MEMORY_AND_DISK). |
persist(newLevel: StorageLevel): Dataset.this.type |
Persist this Dataset with the given storage level. newLevel One of: MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. |
unpersist(): Dataset.this.type |
Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. |
unpersist(blocking: Boolean): Dataset.this.type |
Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. blocking: Whether to block until all blocks are deleted. |
storageLevel: StorageLevel |
Get the Dataset's current storage level, or StorageLevel.NONE if not persisted. |
rdd: RDD[T] |
Represents the content of the Dataset as an RDD of T. |
toDF(): DataFrame |
Converts this strongly typed collection of data to generic Dataframe. |
toDF(colNames: String*): DataFrame |
Converts this strongly typed collection of data to generic DataFrame with columns renamed. val rdd: RDD[(Int, String)] = ... rdd.toDF() // this implicit conversion creates a DataFrame with column name `_1` and `_2` rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
|
coalesce(numPartitions: Int): Dataset[T] |
Returns a new Dataset that has exactly numPartitions partitions, when the fewer partitions are requested. |
repartition(numPartitions: Int): Dataset[T] |
Returns a new Dataset that has exactly numPartitions partitions. |
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] |
Returns a new Dataset partitioned by the given partitioning expressions into numPartitions. The resulting Dataset is hash partitioned. |
repartition(partitionExprs: Column*): Dataset[T] |
Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. |
org.apache.spark.sql - Transformations
select(col: String, cols: String*): DataFrame |
Selects a set of columns. ds.select("colA", "colB")
|
select(cols: Column*): DataFrame |
Selects a set of column based expressions. ds.select($"colA", $"colB")
|
selectExpr(exprs: String*): DataFrame |
Selects a set of SQL expressions. // The following are equivalent:
ds.selectExpr("colA", "colB as newName", "abs(colC)")
ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)")) df.selectExpr("*","(produced_year - (produced_year % 10)) as decade")
|
where(conditionExpr: String): Dataset[T] |
Filters rows using the given SQL expression. To filter a DataFrame, you can also just specify a Boolean column: df.where("isExpensive")
peopleDs.where("age > 15")
|
where(condition: Column): Dataset[T] |
Filters rows using the given condition. peopleDs.where($"age" > 15)
|
filter(conditionExpr: String): Dataset[T] |
Filters rows using the given SQL expression. peopleDs.filter("age > 15")
|
filter(condition: Column): Dataset[T] |
Filters rows using the given condition. // The following are equivalent: peopleDs.filter($"age" > 15)
peopleDs.where($"age" > 15)
|
filter(func: (T) ⇒ Boolean): Dataset[T] |
Returns a new Dataset that only contains elements where func returns true. |
orderBy(sortExprs: Column*): Dataset[T] |
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function. movieTitles.orderBy('title_length.desc, 'produced_year)
|
orderBy(sortCol: String, sortCols: String*): Dataset[T] |
Returns a new Dataset sorted by the given expressions. This is an alias of the sort function. |
sort(sortExprs: Column*): Dataset[T] |
Returns a new Dataset sorted by the given expressions. e.g. ds.sort($"col1", $"col2".desc)
|
sort(sortCol: String, sortCols: String*): Dataset[T] |
Returns a new Dataset sorted by the specified column, all in ascending order. // The following 3 are equivalent
ds.sort("sortcol")
ds.sort($"sortcol")
ds.sort($"sortcol".asc)
|
distinct(): Dataset[T] |
Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for dropDuplicates. |
dropDuplicates(): Dataset[T] |
Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for distinct. |
dropDuplicates(col1: String, cols: String*): Dataset[T] |
Returns a new Dataset with duplicate rows removed, considering only the subset of columns. // The following are equivalent movies.select("movie_title").distinct.selectExpr("count(movie_title) as movies").show movies.dropDuplicates("movie_title").selectExpr("count(movie_title) as movies").show
|
dropDuplicates(colNames: Seq[String]): Dataset[T] |
Returns a new Dataset with duplicate rows removed, considering only the subset of columns. |
dropDuplicates(colNames: Array[String]): Dataset[T] |
Returns a new Dataset with duplicate rows removed, considering only the subset of columns. |
limit(n: Int): Dataset[T] |
Returns a new Dataset by taking the first n rows. The difference between this function and head is that head is an action and returns an array (by triggering query execution) while limit returns a new Dataset. |
withColumn(colName: String, col: Column): DataFrame |
Returns a new Dataset by adding a column or replacing the existing column that has the same name. However, if the given column name matches one of the existing ones, then that column is replaced with the given column expression. // adding a new column based on a certain column expression movies.withColumn("decade", ('produced_year - 'produced_year % 10))
|
withColumnRenamed(existingName: String, newName: String): DataFrame |
Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain existingName. Notice that if the provided existingColName doesn’t exist in the schema, Spark doesn’t throw an error, and it will silently do nothing. movies.withColumnRenamed("actor_name", "actor")
|
drop(colName: String): DataFrame |
Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column name(s). movies.drop("actor_name")
|
drop(colNames: String*): DataFrame |
Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column name(s). You can specify one or more column names to drop, but only the ones that exist in the schema will be dropped and the ones that don’t will be silently ignored. movies.drop("actor_name", "me")
|
drop(col: Column): DataFrame |
Returns a new Dataset with a column dropped. This version of drop accepts a Column rather than a name. This is a no-op if the Dataset doesn't have a column with an equivalent expression. movies.drop($"actor_name")
|
union(other: Dataset[T]): Dataset[T] |
Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct. Notice that the column positions in the schema aren't necessarily matched with the fields in the strongly typed objects in a Dataset. This function resolves columns by their positions in the schema, not the fields in the strongly typed objects. Use unionByName to resolve columns by field name in the typed objects. val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 4| 5| 6|
// +----+----+----+
|
unionByName(other: Dataset[T]): Dataset[T] |
Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct. The difference between this function and union is that this function resolves columns by name (not by position). val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 6| 4| 5|
// +----+----+----+
|
intersect(other: Dataset[T]): Dataset[T] |
Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is equivalent to INTERSECT in SQL. |
Working with Booleans - Column
===(other: Any): Column |
Equality test. df.where(col("InvoiceNo") === 536365)
|
equalTo(other: Any): Column |
Equality test. df.where(col("InvoiceNo").equalTo(536365))
|
<=>(other: Any): Column |
Equality test that is safe for null values. |
=!=(other: Any): Column |
Inequality test. df.where(col("InvoiceNo") =!= 536365)
|
<(other: Any): Column |
Less than. |
<=(other: Any): Column |
Less than or equal to. |
>(other: Any): Column |
Greater than. |
>=(other: Any): Column |
Greater than or equal to an expression. |
&&(other: Any): Column |
Boolean AND. |
||(other: Any): Column |
Boolean OR. |
isNaN: Column |
True if the current expression is NaN. |
isNotNull: Column |
True if the current expression is NOT null. |
isNull: Column |
True if the current expression is null. |
isin(list: Any*): Column |
A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments. According to documentation, isin takes a vararg, not a list. List is actually a confusing name here.val items = List("a", "b", "c") df.filter($"c1".isin(items:_*))
or df.filter($"c1".isin("a", "b", "c"))
|
like(literal: String): Column |
SQL like expression. Returns a boolean column based on a SQL LIKE match. SQL Wildcards%: Represents zero or more characters, e.g. bl%
finds bl, black, blue, and blob _: Represents a single character, e.g. h_t
finds hot, hat, and hit []: Represents any single character within the brackets, e.g. h[oa]t
finds hot and hat, but not hit ^: Represents any character not in the brackets, e.g. h[^oa]t
finds hit, but not hot and hat -: Represents a range of characters, e.g. c[a-b]t
finds cat and cbt |
rlike(literal: String): Column |
SQL RLIKE expression (LIKE with Regex). |
startsWith(literal: String): Column |
String starts with another string literal. Returns a boolean column based on a string match. |
startsWith(other: Column): Column |
String starts with. |
endsWith(literal: String): Column |
String ends with another string literal. Returns a boolean column based on a string match. |
endsWith(other: Column): Column |
String ends with. Returns a boolean column based on a string match. |
contains(other: Any): Column |
Contains the other element. Returns a boolean column based on a string match. |
org.apache.spark.sql.Column
Working with Booleans - functions
not(e: Column): Column |
Inversion of boolean expression, i.e. NOT. |
isnan(e: Column): Column |
Return true iff the column is NaN. |
isnull(e: Column): Column |
Return true iff the column is null. |
org.apache.spark.sql.functions
Working with Numbers - Column
+(other: Any): Column |
Sum of this expression and another expression. |
-(other: Any): Column |
Subtraction. Subtract the other expression from this expression. |
*(other: Any): Column |
Multiplication of this expression and another expression. |
/(other: Any): Column |
Division this expression by another expression. |
%(other: Any): Column |
Modulo (a.k.a. 取余数, e.g. 11 mod 4 = 3
|
org.apache.spark.sql.Column
Working with Numbers - functions
abs(e: Column): Column |
Computes the absolute value. |
round(e: Column): Column |
Returns the value of the column e rounded to 0 decimal places with HALF_UP round mode. |
round(e: Column, scale: Int): Column |
Round the value of e to scale decimal places with HALF_UP round mode if scale is greater than or equal to 0 or at integral part when scale is less than 0. |
bround(e: Column): Column |
Returns the value of the column e rounded to 0 decimal places with HALF_EVEN round mode. HALF_EVEN round towards the "nearest neighbor" unless both neighbors are equidistant, in which case, round towards the even neighbor. |
bround(e: Column, scale: Int): Column |
Round the value of e to scale decimal places with HALF_EVEN round mode if scale is greater than or equal to 0 or at integral part when scale is less than 0. |
pow(l: Double, rightName: String): Column |
Returns the value of the first argument raised to the power of the second argument. |
pow(l: Double, r: Column): Column |
Returns the value of the first argument raised to the power of the second argument. |
pow(leftName: String, r: Double): Column |
Returns the value of the first argument raised to the power of the second argument. |
pow(l: Column, r: Double): Column |
Returns the value of the first argument raised to the power of the second argument. |
pow(leftName: String, rightName: String): Column |
Returns the value of the first argument raised to the power of the second argument. |
pow(leftName: String, r: Column): Column |
Returns the value of the first argument raised to the power of the second argument. |
pow(l: Column, rightName: String): Column |
Returns the value of the first argument raised to the power of the second argument. |
pow(l: Column, r: Column): Column |
Returns the value of the first argument raised to the power of the second argument. |
org.apache.spark.sql.functions
Working with Strings - Column
contains(other: Any): Column |
Contains the other element. Returns a boolean column based on a string match. |
startsWith(literal: String): Column |
String starts with another string literal. Returns a boolean column based on a string match. |
startsWith(other: Column): Column |
String starts with. Returns a boolean column based on a string match. |
endsWith(literal: String): Column |
String ends with another string literal. Returns a boolean column based on a string match. |
endsWith(other: Column): Column |
String ends with. Returns a boolean column based on a string match. |
substr(startPos: Int, len: Int): Column |
An expression that returns a substring. startPos begins with 1. In scala, String has also a function substring(int beginIndex, int endIndex), here the beginIndex starts from 0. |
substr(startPos: Column, len: Column): Column |
An expression that returns a substring. |
like(literal: String): Column |
SQL like expression. Returns a boolean column based on a SQL LIKE match. SQL Wildcards%: Represents zero or more characters, e.g. bl%
finds bl, black, blue, and blob _: Represents a single character, e.g. h_t
finds hot, hat, and hit []: Represents any single character within the brackets, e.g. h[oa]t
finds hot and hat, but not hit ^: Represents any character not in the brackets, e.g. h[^oa]t
finds hit, but not hot and hat -: Represents a range of characters, e.g. c[a-b]t
finds cat and cbt |
rlike(literal: String): Column |
SQL RLIKE expression (LIKE with Regex). |
org.apache.spark.sql.Column
Working with Strings - functions
initcap(e: Column): Column |
Returns a new string column by converting the first letter of each word to uppercase. |
lower(e: Column): Column |
Converts a string column to lower case. |
upper(e: Column): Column |
Converts a string column to upper case. |
trim(e: Column): Column |
Trim the spaces from both ends for the specified string column. |
trim(e: Column, trimString: String): Column |
Trim the specified character from both ends for the specified string column. |
ltrim(e: Column): Column |
Trim the spaces from left end for the specified string value. |
ltrim(e: Column, trimString: String): Column |
Trim the specified character string from left end for the specified string column. |
rtrim(e: Column): Column |
Trim the spaces from right end for the specified string value. |
rtrim(e: Column, trimString: String): Column |
Trim the specified character string from right end for the specified string column. |
lpad(str: Column, len: Int, pad: String): Column |
Left-pad the string column with pad to a length of len. If the string column is longer than len, the return value is shortened to len characters. |
rpad(str: Column, len: Int, pad: String): Column |
Right-pad the string column with pad to a length of len. If the string column is longer than len, the return value is shortened to len characters. |
substring(str: Column, pos: Int, len: Int): Column |
Substring starts at pos and is of length len when str is String type or returns the slice of byte array that starts at pos in byte and is of length len when str is Binary type. Note: The position is not zero based, but 1 based index.1. In scala, String has also a function substring(int beginIndex, int endIndex), here the beginIndex starts from 0. |
substring_index(str: Column, delim: String, count: Int): Column |
Returns the substring from string str before count occurrences of the delimiter delim. If count is positive, everything the left of the final delimiter (counting from left) is returned. If count is negative, every to the right of the final delimiter (counting from the right) is returned. substring_index performs a case-sensitive match when searching for delim. |
regexp_extract(e: Column, exp: String, groupIdx: Int): Column |
Extract a specific group matched by a Java regex, from the specified string column. If the regex did not match, or the specified group did not match, an empty string is returned. val rhymeDF = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme")
rhymeDF.select(regexp_extract('rhyme, "[a-z]*o[xw]",0).as("substring")).show
There could be multiple matches of the pattern in a string; therefore, the group index (starts with 0) is needed to identify which one. |
regexp_replace(e: Column, pattern: Column, replacement: Column): Column |
Replace all substrings of the specified string value that match regexp with rep. |
regexp_replace(e: Column, pattern: String, replacement: String): Column |
Replace all substrings of the specified string value that match regexp with rep. val rhymeDF = Seq(("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"")).toDF("rhyme")
rhymeDF.select(regexp_replace('rhyme, "fox|crow", "animal").as("new_rhyme")).show(false) rhymeDF.select(regexp_replace('rhyme, "[a-z]*o[xw]", "animal").as("new_rhyme")).show(false)
|
repeat(str: Column, n: Int): Column |
Repeats a string column n times, and returns it as a new string column. |
reverse(str: Column): Column |
Reverses the string column and returns it as a new string column. |
split(str: Column, pattern: String): Column |
Splits str around pattern (pattern is a regular expression). |
length(e: Column): Column |
Computes the character length of a given string or number of bytes of a binary string. The length of character strings include the trailing spaces. The length of binary strings includes binary zeros. |
translate(src: Column, matchingString: String, replaceString: String): Column |
Translate any character in the src by a character in replaceString. The characters in replaceString correspond to the characters in matchingString. The translate will happen when any character in the string matches the character in the matchingString. |
concat(exprs: Column*): Column |
Concatenates multiple input columns together into a single column. If all inputs are binary, concat returns an output as binary. Otherwise, it returns as string. |
concat_ws(sep: String, exprs: Column*): Column |
Concatenates multiple input string columns together into a single string column, using the given separator. |
instr(str: Column, substring: String): Column |
Locate the position of the first occurrence of substr column in the given string. Returns null if either of the arguments are null. Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found in str. |
locate(substr: String, str: Column, pos: Int): Column |
Locate the position of the first occurrence of substr in a string column, after position pos. Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found in str. |
locate(substr: String, str: Column): Column |
Locate the position of the first occurrence of substr in a string column, after position pos. Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found in str. |
org.apache.spark.sql.functions
Working with Date/Time - functions
current_date(): Column |
Returns the current date as a date column. |
current_timestamp(): Column |
Returns the current timestamp as a timestamp column. |
date_add(start: Column, days: Int): Column |
Returns the date that is days days after start |
date_sub(start: Column, days: Int): Column |
Returns the date that is days days before start |
datediff(end: Column, start: Column): Column |
Returns the number of days from start to end. |
add_months(startDate: Column, numMonths: Int): Column |
Returns the date that is numMonths after startDate. |
months_between(date1: Column, date2: Column): Column |
Returns number of months between dates date1 and date2. |
year(e: Column): Column |
Extracts the year as an integer from a given date/timestamp/string. |
quarter(e: Column): Column |
Extracts the quarter as an integer from a given date/timestamp/string. |
month(e: Column): Column |
Extracts the month as an integer from a given date/timestamp/string. |
weekofyear(e: Column): Column |
Extracts the week number as an integer from a given date/timestamp/string. |
dayofyear(e: Column): Column |
Extracts the day of the year as an integer from a given date/timestamp/string. |
dayofmonth(e: Column): Column |
Extracts the day of the month as an integer from a given date/timestamp/string. |
dayofweek(e: Column): Column |
Extracts the day of the week as an integer from a given date/timestamp/string. |
hour(e: Column): Column |
Extracts the hours as an integer from a given date/timestamp/string. |
minute(e: Column): Column |
Extracts the minutes as an integer from a given date/timestamp/string. |
second(e: Column): Column |
Extracts the seconds as an integer from a given date/timestamp/string. |
to_date(e: Column): Column |
Converts the column into DateType by casting rules to DateType. |
to_date(e: Column, fmt: String): Column |
Converts the column into a DateType with a specified format (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html]) return null if fail. The format here is the format, which is used by Date to be saved in DF. DF.show() will display the date in default format yyyy-MM-dd. |
to_timestamp(s: Column): Column |
Convert time string to a Unix timestamp (in seconds) by casting rules to TimestampType. |
to_timestamp(s: Column, fmt: String): Column |
Convert time string to a Unix timestamp (in seconds) with a specified format (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html]) to Unix timestamp (in seconds), return null if fail. The format here is the format, which is used by timestamp to be saved in DF. DF.show() will display the timestamp in default format yyyy-MM-dd HH:mm:ss |
date_format(dateExpr: Column, format: String): Column |
Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument. A pattern dd.MM.yyyy would return a string like 18.03.1993. All pattern letters of java.text.SimpleDateFormat can be used. |
unix_timestamp(): Column |
Returns the current Unix timestamp (in seconds). |
unix_timestamp(s: Column): Column |
Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale. Returns null if fails. |
unix_timestamp(s: Column, p: String): Column |
Converts time string with given pattern to Unix timestamp (in seconds). Returns null if fails. |
from_unixtime(ut: Column, f: String): Column |
Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format. |
from_unixtime(ut: Column): Column |
Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format. |
last_day(e: Column): Column |
Given a date column, returns the last day of the month which the given date belongs to. For example, input "2015-07-27" returns "2015-07-31" since July 31 is the last day of the month in July 2015. |
next_day(date: Column, dayOfWeek: String): Column |
Given a date column, returns the first date which is later than the value of the date column that is on the specified day of the week. For example, next_day('2015-07-27', "Sunday") returns 2015-08-02 because that is the first Sunday after 2015-07-27. Day of the week parameter is case insensitive, and accepts: "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun". |
org.apache.spark.sql.functions
The default date format these functions use is yyyy-MM-dd HH:mm:ss.
Working with Null/NaN - Column
isNull: Column |
True if the current expression is null. |
isNotNull: Column |
True if the current expression is NOT null. |
isNaN: Column |
True if the current expression is NaN. |
org.apache.spark.sql.Column
Working with Null/NaN - functions
isnull(e: Column): Column |
Return true iff the column is null. |
isnan(e: Column): Column |
Return true iff the column is NaN. |
nanvl(col1: Column, col2: Column): Column |
Returns col1 if it is not NaN, or col2 if col1 is NaN. Both inputs should be floating point columns (DoubleType or FloatType). |
coalesce(e: Column*): Column |
Returns the first column that is not null, or null if all inputs are null. For example, coalesce(a, b, c) will return a if a is not null, or b if a is null and b is not null, or c if both a and b are null but c is not null. // create a movie with null title
case class Movie(actor_name:String, movie_title:String, produced_year:Long)
val badMoviesDF = Seq( Movie(null, null, 2018L), Movie("John Doe", "Awesome Movie", 2018L)).toDF
// use coalese to handle null value in title column
badMoviesDF.select(coalesce('actor_name, lit("no_name")).as("new_title")).show
+----------+ | new_title| +----------+ | no_name| | John Doe| +----------+ |
org.apache.spark.sql.functions
Working with Null/NaN - DataFrameNaFunctions
drop(): DataFrame |
Returns a new DataFrame that drops rows containing any null or NaN values. |
drop(how: String): DataFrame |
Returns a new DataFrame that drops rows containing null or NaN values. If how is "any", then drop rows containing any null or NaN values. If how is "all", then drop rows only if every column is null or NaN for that row. |
drop(cols: Seq[String]): DataFrame |
(Scala-specific) Returns a new DataFrame that drops rows containing any null or NaN values in the specified columns. |
drop(how: String, cols: Seq[String]): DataFrame |
(Scala-specific) Returns a new DataFrame that drops rows containing null or NaN values in the specified columns. If how is "any", then drop rows containing any null or NaN values in the specified columns. If how is "all", then drop rows only if every specified column is null or NaN for that row. |
drop(minNonNulls: Int): DataFrame |
Returns a new DataFrame that drops rows containing less than minNonNulls non-null and non-NaN values. |
drop(minNonNulls: Int, cols: Seq[String]): DataFrame |
(Scala-specific) Returns a new DataFrame that drops rows containing less than minNonNulls non-null and non-NaN values in the specified columns. |
fill(value: String/Boolean/Double/Long): DataFrame |
Returns a new DataFrame that replaces null values in string/boolean columns (or null or NaN values in numeric columns) with value. |
fill(value: String/Boolean/Double/Long, cols: Seq[String]): DataFrame |
(Scala-specific) Returns a new DataFrame that replaces null values in specified string/boolean/double/long columns. |
fill(valueMap: Map[String, Any]): DataFrame |
(Scala-specific) Returns a new DataFrame that replaces null values. The key of the map is the column name, and the value of the map is the replacement value. The value must be of the following type: Int, Long, Float, Double, String, Boolean. Replacement values are cast to the column data type. e.g. df.na.fill(Map( "A" -> "unknown", "B" -> 1.0 ))
|
|
(Scala-specific) Replaces values matching keys in replacement map. col name of the column to apply the value replacement. If col is "*", replacement is applied on all string, numeric or boolean columns. replacement value replacement map. Key and value of replacement map must have the same type, and can only be doubles, strings or booleans. The map value can have nulls.
// Replaces all occurrences of 1.0 with 2.0 in column "height". df.na.replace("height", Map(1.0 -> 2.0));
// Replaces all occurrences of "UNKNOWN" with "unnamed" in column "name". df.na.replace("name", Map("UNKNOWN" -> "unnamed"));
// Replaces all occurrences of "UNKNOWN" with "unnamed" in all string columns. df.na.replace("*", Map("UNKNOWN" -> "unnamed")); |
|
(Scala-specific) Replaces values matching keys in replacement map.
// Replaces all occurrences of 1.0 with 2.0 in column "height" and "weight". df.na.replace("height" :: "weight" :: Nil, Map(1.0 -> 2.0));
// Replaces all occurrences of "UNKNOWN" with "unnamed" in column "firstname" and "lastname". df.na.replace("firstname" :: "lastname" :: Nil, Map("UNKNOWN" -> "unnamed")); |
org.apache.spark.sql.DataFrameNaFunctions
use df.na
to get DataFrameNaFunctions
Working with Sorting - Column
asc: Column |
Returns a sort expression based on ascending order of the column. // Scala: sort a DataFrame by age column in ascending order. df.sort(df("age").asc)
|
asc_nulls_first: Column |
Returns a sort expression based on ascending order of the column, and null values return before non-null values. |
asc_nulls_last: Column |
Returns a sort expression based on ascending order of the column, and null values appear after non-null values. |
desc: Column |
Returns a sort expression based on the descending order of the column. |
desc_nulls_first: Column |
Returns a sort expression based on the descending order of the column, and null values appear before non-null values. |
desc_nulls_last: Column |
Returns a sort expression based on the descending order of the column, and null values appear after non-null values. |
org.apache.spark.sql.Column
Working with Sorting - functions
asc(columnName: String): Column |
Returns a sort expression based on ascending order of the column. df.sort(asc("dept"), desc("age"))
|
asc_nulls_first(columnName: String): Column |
Returns a sort expression based on ascending order of the column, and null values return before non-null values. |
asc_nulls_last(columnName: String): Column |
Returns a sort expression based on ascending order of the column, and null values appear after non-null values. |
desc(columnName: String): Column |
Returns a sort expression based on the descending order of the column. |
desc_nulls_first(columnName: String): Column |
Returns a sort expression based on the descending order of the column, and null values appear before non-null values. |
desc_nulls_last(columnName: String): Column |
Returns a sort expression based on the descending order of the column, and null values appear after non-null values. |
org.apache.spark.sql.functions
Working with Aggregate functions
count(columnName: String): TypedColumn[Any, Long] |
Aggregate function: returns the number of items in a group. count("*"): count null values count(<column_name>): not count null values |
count(e: Column): Column |
Aggregate function: returns the number of items in a group. |
countDistinct(columnName: String, columnNames: String*): Column |
Aggregate function: returns the number of distinct items in a group. |
countDistinct(expr: Column, exprs: Column*): Column |
Aggregate function: returns the number of distinct items in a group. |
first(columnName: String): Column |
Aggregate function: returns the first value of a column in a group.The function by default returns the first values it sees. It will return the first non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. |
first(e: Column): Column |
Aggregate function: returns the first value in a group. |
first(columnName: String, ignoreNulls: Boolean): Column |
Aggregate function: returns the first value of a column in a group. |
first(e: Column, ignoreNulls: Boolean): Column |
Aggregate function: returns the first value in a group. |
last(columnName: String): Column |
Aggregate function: returns the last value of the column in a group.The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned. |
last(e: Column): Column |
Aggregate function: returns the last value in a group. |
last(columnName: String, ignoreNulls: Boolean): Column |
Aggregate function: returns the last value of the column in a group. |
last(e: Column, ignoreNulls: Boolean): Column |
Aggregate function: returns the last value in a group. |
min(columnName: String): Column |
Aggregate function: returns the minimum value of the column in a group. |
min(e: Column): Column |
Aggregate function: returns the minimum value of the expression in a group. |
max(columnName: String): Column |
Aggregate function: returns the maximum value of the column in a group. |
max(e: Column): Column |
Aggregate function: returns the maximum value of the expression in a group. |
sum(columnName: String): Column |
Aggregate function: returns the sum of all values in the given column. |
sum(e: Column): Column |
Aggregate function: returns the sum of all values in the expression. |
sumDistinct(columnName: String): Column |
Aggregate function: returns the sum of distinct values in the expression. |
sumDistinct(e: Column): Column |
Aggregate function: returns the sum of distinct values in the expression. |
avg(columnName: String): Column |
Aggregate function: returns the average of the values in a group. |
avg(e: Column): Column |
Aggregate function: returns the average of the values in a group. |
mean(columnName: String): Column |
Aggregate function: returns the average of the values in a group. Alias for avg. |
mean(e: Column): Column |
Aggregate function: returns the average of the values in a group. Alias for avg. |
variance(columnName: String): Column |
Aggregate function: alias for var_samp. |
variance(e: Column): Column |
Aggregate function: alias for var_samp |
var_samp(columnName: String): Column |
Aggregate function: returns the unbiased variance of the values in a group. |
var_samp(e: Column): Column |
Aggregate function: returns the unbiased variance of the values in a group. |
var_pop(columnName: String): Column |
Aggregate function: returns the population variance of the values in a group. |
var_pop(e: Column): Column |
Aggregate function: returns the population variance of the values in a group. |
stddev(columnName: String): Column |
Aggregate function: alias for stddev_samp. |
stddev(e: Column): Column |
Aggregate function: alias for stddev_samp. |
stddev_samp(columnName: String): Column |
Aggregate function: returns the sample standard deviation of the expression in a group. |
stddev_samp(e: Column): Column |
Aggregate function: returns the sample standard deviation of the expression in a group. |
stddev_pop(columnName: String): Column |
Aggregate function: returns the population standard deviation of the expression in a group. |
stddev_pop(e: Column): Column |
Aggregate function: returns the population standard deviation of the expression in a group. |
skewness(columnName: String): Column |
Aggregate function: returns the skewness of the values in a group. |
skewness(e: Column): Column |
Aggregate function: returns the skewness of the values in a group. |
kurtosis(columnName: String): Column |
Aggregate function: returns the kurtosis of the values in a group. |
kurtosis(e: Column): Column |
Aggregate function: returns the kurtosis of the values in a group. |
corr(columnName1: String, columnName2: String): Column |
Aggregate function: returns the Pearson Correlation Coefficient for two columns. |
corr(column1: Column, column2: Column): Column |
Aggregate function: returns the Pearson Correlation Coefficient for two columns. |
covar_samp(columnName1: String, columnName2: String): Column |
Aggregate function: returns the sample covariance for two columns. |
covar_samp(column1: Column, column2: Column): Column |
Aggregate function: returns the sample covariance for two columns. |
covar_pop(columnName1: String, columnName2: String): Column |
Aggregate function: returns the population covariance for two columns. |
covar_pop(column1: Column, column2: Column): Column |
Aggregate function: returns the population covariance for two columns. |
collect_list(columnName: String): Column |
Aggregate function: returns a list of objects with duplicates. |
collect_list(e: Column): Column |
Aggregate function: returns a list of objects with duplicates. |
collect_set(columnName: String): Column |
Aggregate function: returns a set of objects with duplicate elements eliminated. |
collect_set(e: Column): Column |
Aggregate function: returns a set of objects with duplicate elements eliminated. |
org.apache.spark.sql.functions
Working with Aggregate - RelationalGroupedDataset
agg(expr: Column, exprs: Column*): DataFrame |
Compute aggregates by specifying a series of aggregate columns. Note that this function by default retains the grouping columns in its output. To not retain grouping columns, set spark.sql.retainGroupColumns to false. import org.apache.spark.sql.functions._
df.groupBy("department").agg(max("age"), sum("expense"))
|
agg(exprs: Map[String, String]): DataFrame |
(Scala-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. df.groupBy("department").agg(Map( "age" -> "max", "expense" -> "sum" ))
|
count(): DataFrame |
Count the number of rows for each group. The resulting DataFrame will also contain the grouping columns. |
max(colNames: String*): DataFrame |
Compute the max value for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the max values for them. |
min(colNames: String*): DataFrame |
Compute the min value for each numeric column for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the min values for them. |
sum(colNames: String*): DataFrame |
Compute the sum for each numeric columns for each group. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the sum for them. |
avg(colNames: String*): DataFrame |
Compute the mean value for each numeric columns for each group. |
mean(colNames: String*): DataFrame |
Compute the average value for each numeric columns for each group. This is an alias for avg. The resulting DataFrame will also contain the grouping columns. When specified columns are given, only compute the average values for them. |
pivot(pivotColumn: String): RelationalGroupedDataset |
Pivots a column of the current DataFrame and performs the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally. |
pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset |
Pivots a column of the current DataFrame and performs the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally. |
org.apache.spark.sql.RelationalGroupedDataset
Use df.groupBy("xxx")
to get RelationalGroupedDataset
Working with Collection - functions
size(e: Column): Column |
Returns length of array or map. |
array_contains(column: Column, value: Any): Column |
Returns null if the array is null, true if the array contains value, and false otherwise. |
sort_array(e: Column): Column |
Sorts the input array for the given column in ascending order, according to the natural ordering of the array elements. |
sort_array(e: Column, asc: Boolean): Column |
Sorts the input array for the given column in ascending or descending order, according to the natural ordering of the array elements. |
explode(e: Column): Column |
Creates a new row for each element in the given array or map column. |
explode_outer(e: Column): Column |
Creates a new row for each element in the given array or map column. Unlike explode, if the array/map is null or empty then null is produced. |
org.apache.spark.sql.functions
Working with Window - functions
rank(): Column |
Window function: returns the rank of rows within a window partition. The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth. e.g. 1, 2, 2, 2, 5 |
dense_rank(): Column |
Window function: returns the rank of rows within a window partition, without any gaps. The difference between rank and dense_rank is that denseRank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth. e.g. 1, 2, 2, 2, 3 |
percent_rank(): Column |
Window function: returns the relative rank (i.e. percentile) of rows within a window partition. This is computed by: (rank of row in its partition - 1) / (number of rows in the partition - 1) |
row_number(): Column |
Window function: returns a sequential number starting at 1 within a window partition. |
cume_dist(): Column |
Window function: returns the cumulative distribution of values within a window partition, i.e. N = total number of rows in the partition cumeDist(x) = number of values before (and including) x / N
|
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.colval
windowSpec = Window.partitionBy("CustomerId", "date").orderBy(col("Quantity").desc).rowsBetween(Window.unboundedPreceding, Window.currentRow)
val purchaseRank = rank().over(windowSpec)
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank")).show()
org.apache.spark.sql.expresseions.WindowSpec
rowBetween |
todo |
rangeBetween |
todo |
|
Created By
Metadata
Comments
No comments yet. Add yours below!
Add a Comment
Related Cheat Sheets