Apache Spark — RDD zipWithIndex

BigDataEnthusiast
2 min readAug 17, 2023

--

Suppose you have a file with unwanted lines in its header, which obviously you don’t wanted to process.

See below sample datafile, it has extra header lines before the actual data start. Actual data is starting from line no.7.

-------------------------------
Dataset: XYZ
Created On:1/1/2010
Created By: user@xyz.com
--------------------------------

ID|NAME|ADDRESS|PHONE
1|A|1 XYZ Lane| +1 1234
2|B|2 XYZ Lane| +1 1235
3|C|3 XYZ Lane| +1 1236
4|D|4 XYZ Lane| +1 1237
5|E|5 XYZ Lane| +1 1238

We cannot read this file directly using spark.read APIs, as that will gives us incorrect dataframe. So let’s explore how we can skip these lines using RDD zipWithIndex.

zipWithIndex

Using RDD zipWithIndex we can assign a index to each line, after that we can filter the lines which has index>(no. of line where actual data is starting). In this example data is actually starts from index 6.

  val rdd = spark.sparkContext.textFile("data.csv")
val rdd_withIndex = rdd.zipWithIndex()
rdd_withIndex.toDF().show()

+--------------------+---+
| _1| _2|
+--------------------+---+
|-----------------...| 0|
| Dataset: XYZ| 1|
| Created On:1/1/2010| 2|
|Created By: user@...| 3|
|-----------------...| 4|
| | 5|
|ID|NAME|ADDRESS|P...| 6|
|1|A|1 XYZ Lane| +...| 7|
|2|B|2 XYZ Lane| +...| 8|
|3|C|3 XYZ Lane| +...| 9|
|4|D|4 XYZ Lane| +...| 10|
|5|E|5 XYZ Lane| +...| 11|
+--------------------+---+

Refer below Spark Scala code to read file as RDD, filter lines & then split data accordingly.

Output Dataframe:

+-------+---------+------------+----------+
|cust_id|cust_name|cust_address|cust_phone|
+-------+---------+------------+----------+
|1 |A |1 XYZ Lane | +1 1234 |
|2 |B |2 XYZ Lane | +1 1235 |
|3 |C |3 XYZ Lane | +1 1236 |
|4 |D |4 XYZ Lane | +1 1237 |
|5 |E |5 XYZ Lane | +1 1238 |
+-------+---------+------------+----------+

Refer below sample code for PySpark.

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder \
.appName("ShobhitApp") \
.getOrCreate()

rdd = spark.sparkContext.textFile("/user/data/cust.csv")
rdd_withIndex = rdd.zipWithIndex()
final_data = rdd_withIndex.filter(lambda x: x[1] > 6).map(lambda x: x[0]).map(lambda x: x.split("|"))
row = Row("COL1","COL2")
df = final_data.map(lambda x: row(x[0],x[1])).toDF()
df.show()

References:

--

--

BigDataEnthusiast
BigDataEnthusiast

Written by BigDataEnthusiast

AWS Certified Data Engineer | Databricks Certified Apache Spark 3.0 Developer | Oracle Certified SQL Expert

No responses yet