Apache Spark — RDD zipWithIndex
Suppose you have a file with unwanted lines in its header, which obviously you don’t wanted to process.
See below sample datafile, it has extra header lines before the actual data start. Actual data is starting from line no.7.
-------------------------------
Dataset: XYZ
Created On:1/1/2010
Created By: user@xyz.com
--------------------------------
ID|NAME|ADDRESS|PHONE
1|A|1 XYZ Lane| +1 1234
2|B|2 XYZ Lane| +1 1235
3|C|3 XYZ Lane| +1 1236
4|D|4 XYZ Lane| +1 1237
5|E|5 XYZ Lane| +1 1238
We cannot read this file directly using spark.read
APIs, as that will gives us incorrect dataframe. So let’s explore how we can skip these lines using RDD zipWithIndex
.
zipWithIndex
Using RDD zipWithIndex
we can assign a index to each line, after that we can filter the lines which has index>(no. of line where actual data is starting)
. In this example data is actually starts from index 6.
val rdd = spark.sparkContext.textFile("data.csv")
val rdd_withIndex = rdd.zipWithIndex()
rdd_withIndex.toDF().show()
+--------------------+---+
| _1| _2|
+--------------------+---+
|-----------------...| 0|
| Dataset: XYZ| 1|
| Created On:1/1/2010| 2|
|Created By: user@...| 3|
|-----------------...| 4|
| | 5|
|ID|NAME|ADDRESS|P...| 6|
|1|A|1 XYZ Lane| +...| 7|
|2|B|2 XYZ Lane| +...| 8|
|3|C|3 XYZ Lane| +...| 9|
|4|D|4 XYZ Lane| +...| 10|
|5|E|5 XYZ Lane| +...| 11|
+--------------------+---+
Refer below Spark Scala code to read file as RDD, filter lines & then split data accordingly.
Output Dataframe:
+-------+---------+------------+----------+
|cust_id|cust_name|cust_address|cust_phone|
+-------+---------+------------+----------+
|1 |A |1 XYZ Lane | +1 1234 |
|2 |B |2 XYZ Lane | +1 1235 |
|3 |C |3 XYZ Lane | +1 1236 |
|4 |D |4 XYZ Lane | +1 1237 |
|5 |E |5 XYZ Lane | +1 1238 |
+-------+---------+------------+----------+
Refer below sample code for PySpark
.
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder \
.appName("ShobhitApp") \
.getOrCreate()
rdd = spark.sparkContext.textFile("/user/data/cust.csv")
rdd_withIndex = rdd.zipWithIndex()
final_data = rdd_withIndex.filter(lambda x: x[1] > 6).map(lambda x: x[0]).map(lambda x: x.split("|"))
row = Row("COL1","COL2")
df = final_data.map(lambda x: row(x[0],x[1])).toDF()
df.show()