Apache Spark: Explode Function
Apache Spark built-in function that takes input as an column object (array or map type) and returns a new row for each element in the given array or map type column. Refer official documentation here.
Explode
Here is the example of explode
function.
Input Dataframe: df
+---+----+---------------------------+
|id |name|phone_details |
+---+----+---------------------------+
|1 |A |[1654123, 2654122, 3654121]|
|2 |B |[1254123, 2354122, 3454121]|
|3 |C |[] |
|4 |D |null |
+---+----+---------------------------+
Output Dataframe: df_exploded
+---+----+---------------------------+----------------------+
|id |name|phone_details |phone_details_exploded|
+---+----+---------------------------+----------------------+
|1 |A |[1654123, 2654122, 3654121]|1654123 |
|1 |A |[1654123, 2654122, 3654121]|2654122 |
|1 |A |[1654123, 2654122, 3654121]|3654121 |
|2 |B |[1254123, 2354122, 3454121]|1254123 |
|2 |B |[1254123, 2354122, 3454121]|2354122 |
|2 |B |[1254123, 2354122, 3454121]|3454121 |
+---+----+---------------------------+----------------------+
If you have noticed here, exploded doesn't include empty list/null values. To include these null values we have to use explode_outer
function.
val df_exploded =
df.withColumn("phone_details_exploded", explode_outer($"phone_details"))
df_exploded.show(false)
Output Dataframe: df_exploded using explode_outer
+---+----+---------------------------+----------------------+
|id |name|phone_details |phone_details_exploded|
+---+----+---------------------------+----------------------+
|1 |A |[1654123, 2654122, 3654121]|1654123 |
|1 |A |[1654123, 2654122, 3654121]|2654122 |
|1 |A |[1654123, 2654122, 3654121]|3654121 |
|2 |B |[1254123, 2354122, 3454121]|1254123 |
|2 |B |[1254123, 2354122, 3454121]|2354122 |
|2 |B |[1254123, 2354122, 3454121]|3454121 |
|3 |C |[] |null |
|4 |D |null |null |
+---+----+---------------------------+----------------------+
Let’s explode if we have array of objects instead array of strings/integers.
Explode — Array of Struct
In this example we will explore array of struct.
Input Dataframe: df
+----+---+--------------------------------------------------+
|name|id |phone_details |
+----+---+--------------------------------------------------+
|1 |A |[[home, +1 1254, true], [office, +1 12345, false]]|
|2 |B |[[home, +1 1254, false], [office, +1 12345, true]]|
|3 |C |[[home, +1 1254, true], [office, +1 12345, false]]|
|4 |D |[] |
|5 |D |null |
+----+---+--------------------------------------------------+
Output Dataframe: df_exploded
+----+---+--------------------------------------------------+-------------------------+
|name|id |phone_details |phone_details_exploded |
+----+---+--------------------------------------------------+-------------------------+
|1 |A |[[home, +1 1254, true], [office, +1 12345, false]]|[home, +1 1254, true] |
|1 |A |[[home, +1 1254, true], [office, +1 12345, false]]|[office, +1 12345, false]|
|2 |B |[[home, +1 1254, false], [office, +1 12345, true]]|[home, +1 1254, false] |
|2 |B |[[home, +1 1254, false], [office, +1 12345, true]]|[office, +1 12345, true] |
|3 |C |[[home, +1 1254, true], [office, +1 12345, false]]|[home, +1 1254, true] |
|3 |C |[[home, +1 1254, true], [office, +1 12345, false]]|[office, +1 12345, false]|
+----+---+--------------------------------------------------+-------------------------+
Input Dataframe Schema:
root
|-- name: string (nullable = true)
|-- id: string (nullable = true)
|-- phone_details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- phone_type: string (nullable = true)
| | |-- number: string (nullable = true)
| | |-- primary: boolean (nullable = true)
Output Dataframe Schema:
root
|-- name: string (nullable = true)
|-- id: string (nullable = true)
|-- phone_details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- phone_type: string (nullable = true)
| | |-- number: string (nullable = true)
| | |-- primary: boolean (nullable = true)
|-- phone_details_exploded: struct (nullable = true)
| |-- phone_type: string (nullable = true)
| |-- number: string (nullable = true)
| |-- primary: boolean (nullable = true)
Notice in the above schema comparison, how the array of struct converted to struct after explode.
Elegant way to select all columns from struct
see below example.
val df_exploded =
df.withColumn("phone_details_exploded", explode($"phone_details"))
df_exploded
.select("id", "name", "phone_details_exploded.*")
.show(false)
+---+----+----------+--------+-------+
|id |name|phone_type|number |primary|
+---+----+----------+--------+-------+
|A |1 |home |+1 1254 |true |
|A |1 |office |+1 12345|false |
|B |2 |home |+1 1254 |false |
|B |2 |office |+1 12345|true |
|C |3 |home |+1 1254 |true |
|C |3 |office |+1 12345|false |
+---+----+----------+--------+-------+
Other way of selecting columns by explicitly giving column names from struct.
df_exploded
.select(
"id",
"phone_details_exploded.phone_type",
"phone_details_exploded.number"
)
.show(false)
Explode Multiple Columns
Suppose we want to explode multiple columns: If we go with one by one approach for exploding multiple columns, it can create bunch of redundant data. We can do first zip
columns & then explode. See below example.
Input Dataframe:
+----+---+--------------------------------------------------------------------------+---------------------------------------------------+
|name|id |phone_details |address_details |
+----+---+--------------------------------------------------------------------------+---------------------------------------------------+
|1 |A |[[home, +1 1254, true], [home, +1 1233, false], [office, +1 12345, false]]|[[home, XYZ Lane, true], [office, YZ CITY, false]] |
|2 |B |[[home, +1 1254, false], [office, +1 12345, true]] |[[home, ADC Lane, true], [office, AYZ CITY, false]]|
|3 |C |[[home, +1 1254, true], [office, +1 12345, false]] |[[home, XSD Lane, true], [office, XS CITY, false]] |
|4 |D |[] |null |
|5 |D |null |[] |
+----+---+--------------------------------------------------------------------------+---------------------------------------------------+
root
|-- name: string (nullable = true)
|-- id: string (nullable = true)
|-- phone_details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- phone_type: string (nullable = true)
| | |-- number: string (nullable = true)
| | |-- primary: boolean (nullable = true)
|-- address_details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address_type: string (nullable = true)
| | |-- address: string (nullable = true)
| | |-- primary: boolean (nullable = true)
Output Dataframe:
+---+----+-------------------------+-------------------------+
|id |name|phone_details |address_details |
+---+----+-------------------------+-------------------------+
|A |1 |[home, +1 1254, true] |[home, XYZ Lane, true] |
|A |1 |[home, +1 1233, false] |[office, YZ CITY, false] |
|A |1 |[office, +1 12345, false]|null |
|B |2 |[home, +1 1254, false] |[home, ADC Lane, true] |
|B |2 |[office, +1 12345, true] |[office, AYZ CITY, false]|
|C |3 |[home, +1 1254, true] |[home, XSD Lane, true] |
|C |3 |[office, +1 12345, false]|[office, XS CITY, false] |
+---+----+-------------------------+-------------------------+
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- phone_details: struct (nullable = true)
| |-- phone_type: string (nullable = true)
| |-- number: string (nullable = true)
| |-- primary: boolean (nullable = true)
|-- address_details: struct (nullable = true)
| |-- address_type: string (nullable = true)
| |-- address: string (nullable = true)
| |-- primary: boolean (nullable = true)
posexplode
Creates a new row for each element with position in the given array or map column. Uses the default column name pos
for position, and col
for elements in the array and key
and value
for elements in the map unless specified otherwise.
Input Dataframe:
+---+----+---------------------------+
|id |name|phone_details |
+---+----+---------------------------+
|1 |A |[1654123, 2654122, 3654121]|
|2 |B |[1254123, 2354122, 3454121]|
|3 |C |[] |
|4 |D |null |
+---+----+---------------------------+
Output Dataframe:
+---+----+---+-------+
|id |name|pos|col |
+---+----+---+-------+
|1 |A |0 |1654123|
|1 |A |1 |2654122|
|1 |A |2 |3654121|
|2 |B |0 |1254123|
|2 |B |1 |2354122|
|2 |B |2 |3454121|
+---+----+---+-------+
If you are using posexplode
in withColumn
it might fail with this exception. Checkout this issue for more details: SPARK-20174
val df_exploded =
df.withColumn("phone", posexplode($"phone_details"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases but got phone ;
So better to use posexplode
with select
or selectExpr
.
import spark.implicits._
val df = List(
("1", "A", List("1654123", "2654122", "3654121")),
("2", "B", List("1254123", "2354122", "3454121")),
("3", "C", List()),
("4", "D", null)
).toDF("id", "name", "phone_details")
df.selectExpr("*", "posexplode(phone_details) as (p,c)")
.drop("phone_details")
.show(false)
Output:
+---+----+---+-------+
|id |name|p |c |
+---+----+---+-------+
|1 |A |0 |1654123|
|1 |A |1 |2654122|
|1 |A |2 |3654121|
|2 |B |0 |1254123|
|2 |B |1 |2354122|
|2 |B |2 |3454121|
+---+----+---+-------+
Refer other blogs: