Apache Iceberg — Insert Overwrite
INSERT OVERWRITE
can replace/overwrite the data in iceberg table, depending on configurations set and how we are using it.
The partitions that will be replaced by INSERT OVERWRITE
depends on two factors.
- Spark’s partition overwrite mode (
spark.sql.sources.partitionOverwriteMode
) - Partitioning of a table.
Let’s explore the behavior of INSERT OVERWRITE
for partitioned enabled table.
Spark’s default overwrite mode is static. But we can configure as Dynamic mode by setting this spark configuration:
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
static
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
Static Behavior
I have created a partitioned enabled table and inserted some rows. Also the overwrite mode is set as default i.e. static.
ddl = """create table iceberg.customer (country string, customerid bigint, customername string)
USING iceberg
PARTITIONED BY (country)"""
spark.sql(ddl)
spark.sql("INSERT INTO iceberg.customer VALUES ('US',1001,'A'), ('INDIA',1002,'B'), ('INDIA',1003,'C') ")
spark.sql("select * from iceberg.customer").show()
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
Output: So we have two partitions & three records in iceberg table.
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1002| B|
| INDIA| 1003| C|
+-------+----------+------------+
'static'
Overwrite mode: Static. If we perform Insert Overwrite
without partition clause: all partitions will be replaced. It’s like truncate & load.
spark.sql(""" with data as
(select 'INDIA' country, 1004 customerid, 'D' customername
union all
select 'INDIA' country, 1005 customerid, 'E' customername
)
INSERT OVERWRITE iceberg.customer SELECT * from data """)
spark.sql("select * from iceberg.customer").show()
-- output --
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| INDIA| 1004| D|
| INDIA| 1005| E|
+-------+----------+------------+
See below output, after executing Insert Overwrite
without partition clause.
Because there is no PARTITION
clause provided in above query, it has dropped all existing rows in the table when run in static mode. Records related to US partitions also dropped.
To overwrite just the partitions that were loaded, we can add PARTITION
clause that aligns with the SELECT
query filter.
Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION
clause to a filter, but the PARTITION
clause can only reference table columns.
Let’s setup the data again & try Insert Overwrite
with partition clause.
spark.sql("select * from iceberg.customer").show()
-- output --
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1002| B|
| INDIA| 1003| C|
+-------+----------+------------+
So we have two partitions & three records in iceberg table. Let’s try to overwrite the INDIA partition data only.
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
spark.sql(""" with data as
(select 'INDIA' country, 1004 customerid, 'D' customername
union all
select 'INDIA' country, 1005 customerid, 'E' customername
union all
select 'INDIA' country, 1006 customerid, 'F' customername
)
INSERT OVERWRITE iceberg.customer
PARTITION (country = 'INDIA')
SELECT customerid, customername from data """)
-- output --
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1004| D|
| INDIA| 1005| E|
| INDIA| 1006| F|
+-------+----------+------------+
So using the PARTITION
clause we can control the behavior of insert overwrite.
Dynamic Behavior
When Spark’s overwrite mode is dynamic, partitions that have rows produced by the SELECT
query will be replaced.
Let’s setup the data again & try Insert Overwrite
with overwrite mode as dynamic .
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
ddl = """create table iceberg.customer (country string, customerid bigint, customername string)
USING iceberg
PARTITIONED BY (country)"""
spark.sql(ddl)
spark.sql("INSERT INTO iceberg.customer VALUES ('US',1001,'A'), ('INDIA',1002,'B'), ('INDIA',1003,'C') ")
spark.sql("select * from iceberg.customer").show()
-- output --
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1002| B|
| INDIA| 1003| C|
+-------+----------+------------+
Let’s overwrite the table.
spark.sql(""" with data as
(select 'INDIA' country, 1004 customerid, 'D' customername
union all
select 'INDIA' country, 1005 customerid, 'E' customername
union all
select 'INDIA' country, 1006 customerid, 'F' customername
)
INSERT OVERWRITE iceberg.customer
SELECT country, customerid, customername from data """)
spark.sql("select * from iceberg.customer").show()
-- output --
+-------+----------+------------+
|country|customerid|customername|
+-------+----------+------------+
| US| 1001| A|
| INDIA| 1004| D|
| INDIA| 1005| E|
| INDIA| 1006| F|
+-------+----------+------------+
So the partition of INDIA is only replaced here.