Apache Iceberg — Hidden Partitioning

BigDataEnthusiast
5 min readMar 30, 2024

--

In this blog we will explore “Hidden Partitioning” concept in Apache Iceberg.

Let’s first discuss about partitioning in Apache Hive. Suppose we have data like this (refer below) & want to store data in Hive based on daily partition.

+----------------+----------------------+--------+---------+
| transaction_id | transaction_dt | amount | cust_id |
+----------------+----------------------+--------+---------+
| 1 | 2023-04-25 11:12:11 | 100.00 | c1 |
| 2 | 2023-04-25 12:12:12 | 200.00 | c1 |
| 3 | 2023-04-26 11:11:11 | 300.50 | c2 |
| 4 | 2023-04-26 10:12:10 | 500.00 | c3 |
+----------------+----------------------+------------------+

We will create table something like this, by explicitly giving the partition value column.

CREATE TABLE sales_data (
transaction_id int,
transaction_dt timestamp,
amount double,
cust_id string
)
PARTITIONED by (day_part string)

Also, while ingesting the data into table, it is our responsibility to create partition value by transforming the exisingtransformation_dt timestamp column to date format yyyy-MM-dd. See below example.

INSERT OVERWRITE TABLE sales_data
PARTITION (day_part)
SELECT
transaction_id,
transaction_dt,
amount,
cust_id string,
substring(transaction_dt,1,10) as part_day -- creating partition value
FROM temp_view;

In the above insert query, we have transformed the source timestamp column to day format yyyy-MM-dd in-order to generate partition values.

Now while querying the data from partition table, we have to explicitly pass the partition value column in query to take benefit of partition pruning. Everytime we insert/query table we have to remember the table physical layout.

select * from sales_data where part_day = '2023-04-25' and <other-filters>

For monthly based partitioned table, we should have used a transformation like this while ingesting data for creating a month partition value.

substring(transaction_dt, 1, 7) as part_month --partition column in "yyyy-MM"

How Apache Iceberg do differently, let's explore. As per the official document of Iceberg.

1. Iceberg handles the tedious and error-prone task of producing partition values for rows in a table.

2. Consumers don’t need to know how the table is partitioned and add extra filters to their queries.

So, we don't have to explicitly provide the partition value while table creation and data ingestion, Iceberg will responsible for creating partition values and tracking the relationship. See below example:

ddl = """
CREATE TABLE dev.sales_data
(trnsaction_id int,
transaction_dt timestamp,
amount double,
cust_id string)
USING iceberg
PARTITIONED BY (day(transaction_dt))
"""

spark.sql(ddl)

spark.sql("DESCRIBE EXTENDED dev.sales_data").show(20,False)

Here we have used source column transaction_dt of timestamp type & partition transform day to create a partition values.

Iceberg support different type of transform like identity, bucket[N], truncate[W], year, month, day, hour, void refer complete transform list here.

Let’s ingest some data in Iceberg table dev.sales_data & explore.

from datetime import datetime
from pyspark.sql.types import *

data = [
(1, datetime.strptime("2023-04-25 11:12:11", '%Y-%m-%d %H:%M:%S'), 100.00, "C1"),
(2, datetime.strptime("2023-04-25 12:12:12", '%Y-%m-%d %H:%M:%S'), 200.00, "C1"),
(3, datetime.strptime("2023-04-26 11:11:11", '%Y-%m-%d %H:%M:%S'), 200.00, "C2"),
(4, datetime.strptime("2023-04-26 10:12:10", '%Y-%m-%d %H:%M:%S'), 300.50, "C2"),
(5, datetime.strptime("2023-04-27 11:12:10", '%Y-%m-%d %H:%M:%S'), 300.00, "C3"),
]

schema = StructType([
StructField("trnsaction_id", IntegerType(), True), \
StructField("transaction_dt", TimestampType(), True), \
StructField("amount", DoubleType(), True), \
StructField("cust_id", StringType(), True), \
])

df = spark.createDataFrame(data=data,schema=schema)
df.writeTo("dev.sales_data").append()

See below Iceberg has created the partitions on day, as we have used (day(transaction_dt) transform.

Now, let’s query the Iceberg table to fetch the records for partition date2023–04–25 .

from pyspark.sql.functions import col
spark.table('dev.sales_data').filter((col('transaction_dt')>='2023-04-25 00:00:00') & (col('transaction_dt')<'2023-04-26 00:00:00')).show()

In above query we haven't explicitly provided the partition value in filters, just provided two timestamps. This concept is nothing but “Hidden Partitioning”, where user doesn’t worry about physical layout.

Let’s check the Spark Job UI for more details. Refer below logs, Iceberg skipped other 2 partition folders/files.

See below the physical plan of the query, it shows how Iceberg handles the timestamp in epoch format.

Let’s ingest some more data in table dev.sales_data & fetch month data instead of a day partition.

I have ingested data from different dates/month. See below partitions.

# querying april month data

from pyspark.sql.functions import col
spark.table('dev.sales_data').filter((col('transaction_dt')>='2023-04-01 00:00:00') & (col('transaction_dt')<'2023-05-01 00:00:00')).show()

Refer below screenshot, it has scanned only 3 datafiles, rest skipped.

Benefits

  • Partition values are produced correctly every time as it is automatically created by Iceberg using transform.
  • Queries no longer depend on a table’s physical layout. As we don’t have to explicitly add extra filters while querying data.
  • Partition Evolution — Iceberg table partitioning can be updated in an existing table, as queries do not reference partition values directly.
  • Misconfigured tables can be fixed without an expensive migration.

References

--

--

BigDataEnthusiast
BigDataEnthusiast

Written by BigDataEnthusiast

AWS Certified Data Engineer | Databricks Certified Apache Spark 3.0 Developer | Oracle Certified SQL Expert

No responses yet