Apache Iceberg — Insert Overwrite

BigDataEnthusiast
3 min readJul 9, 2023

--

INSERT OVERWRITE can replace/overwrite the data in iceberg table, depending on configurations set and how we are using it.

The partitions that will be replaced by INSERT OVERWRITE depends on two factors.

  • Spark’s partition overwrite mode (spark.sql.sources.partitionOverwriteMode)
  • Partitioning of a table.

Let’s explore the behavior of INSERT OVERWRITE for partitioned enabled table.

Spark’s default overwrite mode is static. But we can configure as Dynamic mode by setting this spark configuration:

spark.conf.get("spark.sql.sources.partitionOverwriteMode")
static

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Static Behavior

I have created a partitioned enabled table and inserted some rows. Also the overwrite mode is set as default i.e. static.

ddl = """create table iceberg.customer (country string, customerid bigint, customername string) 
USING iceberg
PARTITIONED BY (country)"""

spark.sql(ddl)

spark.sql("INSERT INTO iceberg.customer VALUES ('US',1001,'A'), ('INDIA',1002,'B'), ('INDIA',1003,'C') ")

spark.sql("select * from iceberg.customer").show()

spark.conf.get("spark.sql.sources.partitionOverwriteMode")

Output: So we have two partitions & three records in iceberg table.

+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1002| B|
| INDIA| 1003| C|
+-------+----------+------------+

'static'

Overwrite mode: Static. If we perform Insert Overwrite without partition clause: all partitions will be replaced. It’s like truncate & load.

spark.sql(""" with data as 
(select 'INDIA' country, 1004 customerid, 'D' customername
union all
select 'INDIA' country, 1005 customerid, 'E' customername
)

INSERT OVERWRITE iceberg.customer SELECT * from data """)

spark.sql("select * from iceberg.customer").show()

-- output --

+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| INDIA| 1004| D|
| INDIA| 1005| E|
+-------+----------+------------+

See below output, after executing Insert Overwrite without partition clause.

Because there is no PARTITION clause provided in above query, it has dropped all existing rows in the table when run in static mode. Records related to US partitions also dropped.

To overwrite just the partitions that were loaded, we can add PARTITION clause that aligns with the SELECT query filter.

Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION clause to a filter, but the PARTITION clause can only reference table columns.

Let’s setup the data again & try Insert Overwrite with partition clause.

spark.sql("select * from iceberg.customer").show()

-- output --

+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1002| B|
| INDIA| 1003| C|
+-------+----------+------------+

So we have two partitions & three records in iceberg table. Let’s try to overwrite the INDIA partition data only.

spark.conf.get("spark.sql.sources.partitionOverwriteMode")

spark.sql(""" with data as
(select 'INDIA' country, 1004 customerid, 'D' customername
union all
select 'INDIA' country, 1005 customerid, 'E' customername
union all
select 'INDIA' country, 1006 customerid, 'F' customername
)

INSERT OVERWRITE iceberg.customer
PARTITION (country = 'INDIA')
SELECT customerid, customername from data """)

-- output --
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1004| D|
| INDIA| 1005| E|
| INDIA| 1006| F|
+-------+----------+------------+

So using the PARTITION clause we can control the behavior of insert overwrite.

Dynamic Behavior

When Spark’s overwrite mode is dynamic, partitions that have rows produced by the SELECT query will be replaced.

Let’s setup the data again & try Insert Overwrite with overwrite mode as dynamic .

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

ddl = """create table iceberg.customer (country string, customerid bigint, customername string)
USING iceberg
PARTITIONED BY (country)"""

spark.sql(ddl)

spark.sql("INSERT INTO iceberg.customer VALUES ('US',1001,'A'), ('INDIA',1002,'B'), ('INDIA',1003,'C') ")

spark.sql("select * from iceberg.customer").show()

-- output --
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1002| B|
| INDIA| 1003| C|
+-------+----------+------------+

Let’s overwrite the table.

spark.sql(""" with data as 
(select 'INDIA' country, 1004 customerid, 'D' customername
union all
select 'INDIA' country, 1005 customerid, 'E' customername
union all
select 'INDIA' country, 1006 customerid, 'F' customername
)

INSERT OVERWRITE iceberg.customer
SELECT country, customerid, customername from data """)

spark.sql("select * from iceberg.customer").show()

-- output --

+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1004| D|
| INDIA| 1005| E|
| INDIA| 1006| F|
+-------+----------+------------+

So the partition of INDIA is only replaced here.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

BigDataEnthusiast
BigDataEnthusiast

Written by BigDataEnthusiast

AWS Certified Data Engineer | Databricks Certified Apache Spark 3.0 Developer | Databricks Certified Data Engineer | Oracle Certified SQL Expert

No responses yet

Write a response