Apache Iceberg Table — v2 Format — merge-on-read
In previous blog we have already seen different types of iceberg table format & write mode supported. Please refer below link.
Please refer below link for v1 format copy-on-write.
In this blog we will explore the behavior of update/delete on v2 table format i.e. merge-on-read.
I have created a v2 format table (without partition), with write.mode
as merge-on-read & appended some data from spark application.
import pyspark
from pyspark.sql import SparkSession
conf = (
pyspark.SparkConf()
.setAppName('iceberg')
.set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.iceberg.type', 'hadoop')
.set('spark.sql.catalog.iceberg.warehouse', 'iceberg-warehouse')
)
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
ddl = "create table iceberg.nyc_yellowtaxi_tripdata_v2 (vendorid bigint, tpep_pickup_datetime timestamp, tpep_dropoff_datetime timestamp, passenger_count double, trip_distance double, ratecodeid double, store_and_fwd_flag string, pulocationid bigint, dolocationid bigint, payment_type bigint, fare_amount double, extra double, mta_tax double, tip_amount double, tolls_amount double, improvement_surcharge double, total_amount double, congestion_surcharge double, airport_fee double) USING iceberg TBLPROPERTIES ('format-version' = '2','write.update.mode'='merge-on-read','write.delete.mode'='merge-on-read','write.merge.mode'='merge-on-read')"
spark.sql(ddl)
df=spark.read.parquet("/home/docker/data/*")
df.writeTo("iceberg.nyc_yellowtaxi_tripdata_v2").append()
Table iceberg.nyc_yellowtaxi_tripdata_v2
is created let’s explore its metadata files.
Metadata Files
As we have performed append operations, hence 1st snapshot ( 5294571235127407036) got created. See below excerpt from metadata.json file.
Two datafiles added via this append. Metadata file is also providing all details like count of datafiles added, added record count etc.
Manifest List & File:
Manifest list is providing us the path for manifest file.
Manifest file is providing us the details of actual data files. It is clearly showing two entries i.e. two datafiles added via first snapshot.
So as of now we have two datafiles for iceberg table iceberg.nyc_yellowtaxi_tripdata_v2
in data directory.
Step 2: Update/Delete
Let’s update/delete some record to see the behavior of v2 table merge-on-read.
spark.sql("update iceberg.nyc_yellowtaxi_tripdata_v2 set airport_fee = 10.0 where vendorid=1 and tpep_pickup_datetime='2022-01-01 00:44:50' ").show()
As we have performed delete operations, hence 2nd snapshot (6174909669162806704) got created. See below excerpt from latest metadata.json file. Notice here operation as overwrite, also showing count of file added/removed, positional delete files details, record count added/removed.
It is evident from above metadata, that Iceberg has not re-written the whole affected data file, instead only changes are written in separate datafile. Also it has created a positional-delete-file to hold the position of updated records.
Let’s explore metadata list. It has 3 entries for manifest file.
- 1st Metadata File — Showing data file is added. Added rows=1
- 2nd Metadata File — Showing 2 datafile added. Previous snapshot. Hence reusing the old manifest files. As it is merge-on-read i.e. originals files as it is.
- 3rd Metadata File — Showing data file is added. Added rows=1
Let’s see the metadata lists.
To read the manifest files, we should see the status of datafile. Refer below table.
status : 0=EXISTING, aka rewrite, 1= ADDED, 2 =DELETED
+--------+-------------------------+
| status | description |
+--------+-------------------------+
| 0 | 0=EXISTING, aka rewrite |
| 1 | ADDED |
| 2 | DELETED |
+--------+-------------------------+
- 1st Metadata File — Showing data file is added. Added rows=1
-- content of 00191-845-a0875aac-5120-4f7b-934e-1184839b710f-00001.parquet
[{"vendorid":"1","tpep_pickup_datetime":"1640997890000000","tpep_dropoff_datetime":"1640998278000000","passenger_count":"1","trip_distance":"1.8","ratecodeid":"1","store_and_fwd_flag":"N","pulocationid":"229","dolocationid":"263","payment_type":"1","fare_amount":"7.5","extra":"3","mta_tax":"0.5","tip_amount":"2","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"13.3","congestion_surcharge":"2.5","airport_fee":"10"}]
So this datafile is holding the updated record. Which will get merged at the time of reading.
- 2nd Metadata File — Showing 2 datafile added. Previous snapshot. Hence reusing the old manifest files. As it is merge-on-read i.e. originals files as it is.
- 3rd Metadata File — Showing data file is added. Added rows=1
-- content of 00191-845-14f1ef28-c5a4-4ff4-a084-5eb6ec9626df-00001.parquet
[{"file_path":"iceberg-warehouse/nyc_yellowtaxi_tripdata_v2/data/00000-838-237563b7-0c55-4eb2-ad65-d508a2197cda-00001.parquet","pos":"660"}]
It is positional delete file. Which is holding the position of record in original file, the updated record position.
So as of now we have two datafiles for iceberg table iceberg.nyc_yellowtaxi_tripdata_v2
in data directory: Notice original file is intact.
So in merge-on-read, If we are updating/deleting rows, it will not re-write the entire datafile. Instead changes are written to new file. At the time of reading changes will get merged with original file.