Row level transactions on S3 Data Lake
Row level operations were always tricky on immutable object storage based Data Lakes. To overcome this we have written ETLs to overwrite entire partition or even entire table each time an update or delete is detected. But now row level operations are much more easy with Iceberg table as Athena supports for Iceberg has grown a lot. We will discuss the same with examples and some common errors that can happen.
Iceberg — Table format for Data Lake
In case of an RDBMS like MySQL all interaction with the underlying data, like writing it and reading it, was handled by the database’s storage engine. Similarly the primary goal of a table format like Iceberg is to provide the abstraction of a table to people and tools and allow them to efficiently interact with that table’s underlying data. Iceberg do it with help of additional metadata layer on top of actual data files on S3. You can read more Iceberg architecture in this detailed but simple article

Athena and Iceberg
Athena has been supporting Iceberg table for some time now — but it was very limited. Now with engine v3, the new release came recently, Iceberg support strengthen :
- ORC and Avro support — Create Iceberg tables using the Apache Avro and Apache ORC row and column-based file formats. Support for these formats is in addition to the existing support for Parquet.
- MERGE INTO — Use the
MERGE INTO
command to merge data at scale efficiently.MERGE INTO
combines theINSERT
,UPDATE
, andDELETE
operations into one transaction. This reduces the processing overhead in your data pipeline and takes less SQL to write. For more information, see Updating Iceberg table data and MERGE INTO. - CTAS and VIEW support — Use the
CREATE TABLE AS SELECT
(CTAS) andCREATE VIEW
statements with Iceberg tables. For more information, see CREATE TABLE AS and CREATE VIEW. - VACUUM support — You can use the
VACUUM
statement to optimize your data lake by deleting snapshots and data that are no longer required. You can use this feature to improve read performance and meet regulatory requirements like GDPR. For more information, see Optimizing Iceberg tables and VACUUM.
Create Iceberg Tables using CTAS
CTAS can be used to create an Iceberg table from a normal Athena table:
CREATE TABLE icb_test.icb_bucket_initial
WITH (table_type = 'ICEBERG',
format = 'PARQUET',
location = 's3://athena-icb-bucket-test/icb_test/icb_bucket_1/',
is_external = false,
vacuum_min_snapshots_to_keep = 10
)
AS SELECT device_id,
truck_id,
order_uuid,
ticket_number
from athena_normal_tables.ticket
where year = '2022' and month='11' and day='10' and hour = '11';
Now if you have a timestamp
column in source table you might get an error NOT_SUPPORTED: Timestamp precision (3) not supported for Iceberg. Use “timestamp(6)” instead. Cast the type of timestamp columns as shown below to tackle this:
CREATE TABLE icb_test.icb_bucket_timestamp
WITH (table_type = 'ICEBERG',
format = 'PARQUET',
location = 's3://athena-icb-bucket-test/icb_test/icb_bucket_1/',
is_external = false,
vacuum_min_snapshots_to_keep = 10
)
AS SELECT device_id,
truck_id,
order_uuid,
ticket_number,
cast(ingestion_timestamp_utc as timestamp(6)) as ingestion_timestamp_utc -- Typecasting
from athena_normal_tables.ticket
where year = '2022' and month='11' and day='10' and hour = '11';
To create table without data use WITH NO DATA
:
CREATE TABLE icb_test.icb_bucket_with_no_data
WITH (table_type = 'ICEBERG',
format = 'PARQUET',
location = 's3://athena-icb-bucket-test/icb_test/icb_bucket_1/',
is_external = false,
vacuum_min_snapshots_to_keep = 10
)
AS SELECT device_id,
truck_id,
order_uuid,
ticket_number,
cast(ingestion_timestamp_utc as timestamp(6)) as ingestion_timestamp_utc -- Typecasting
from athena_normal_tables.ticket
WITH NO DATA
Row level operations on Iceberg table
Now let’s discuss how to insert/update/delete rows from Iceberg tables
Insert
Once the Iceberg table is created you can insert data from normal Athena tables using INSERT INTO
OR MERGE
statements
INSERT INTO iceberg_table (col1, col2, ...) VALUES (val1, val2, ...)
INSERT INTO iceberg_table SELECT * FROM another_normal_table
-- Eg:
INSERT INTO icb_test.icb_test.icb_bucket_with_no_data
SELECT device_id,
truck_id,
order_uuid,
ticket_number,
ingestion_timestamp_utc
from athena_normal_tables.ticket
-- Note that there is no typecasting of timestamp column is
-- needed while insrerting the data
Now we use MERGE
statement also to insert data. We will discuss full MERGE
command in detail later.
-- Insert the records which are not present in account and present in
-- monthly_accounts_update
-- Here monthly_accounts_update can be a normal Athena table or Icb table but account must bean Iceberg table
MERGE INTO accounts t USING monthly_accounts_update s
ON (t.customer = s.customer)
WHEN NOT MATCHED
THEN INSERT (customer, purchases, address)
VALUES(s.customer, s.purchases, s.address)
While using MERGE
it is important to note that an error is raised when a single target table row matches more than one source row. So make sure you have one-to-one join here. If source table has duplicates you can use deduplication as shown below
MERGE INTO accounts t USING
( with rw as(select *,row_number() over(partition by customer,purchases,address) rn
from monthly_accounts_update)
select customer,purchases,address from rw where rn = 1) as s
ON (t.customer = s.customer)
WHEN NOT MATCHED
THEN INSERT (customer, purchases, address)
VALUES(s.customer, s.purchases, s.address)
Delete
There is DELETE
command but it is very limited, it can’t be used with joining or nesting with other tables. Can only be used wit static values as below
DELETE FROM iceberg_table WHERE category='c3'
To have proper deletion of rows based on looking other tables, MERGE
can be used
-- deletes all customers from table accounts that are in the source table monthly_accounts_update.
-- also,monthly_accounts_update can be a normal Athena table or Icb table but account must bean Iceberg table
MERGE INTO accounts t USING monthly_accounts_update s
ON t.customer = s.customer
WHEN MATCHED
THEN DELETE
Update
Just like delete there is very limited UPDATE
statement which can be used only with static values
UPDATE iceberg_table SET category='c4' WHERE category='c1'
And we have again MERGE
for more use-cases. All the conditions we discussed for above for insert/delete is applicable here.
MERGE INTO accounts t USING monthly_accounts_update s
ON (t.customer = s.customer)
WHEN MATCHED
THEN UPDATE
SET purchases = s.purchases + t.purchases, address = s.address
Merge Statement
As we saws above MERGE
is a powerful command to conditionally update, delete, or insert rows from an Iceberg table.
Command structure is as below:
MERGE INTO target_table as target_alias
USING { source_table or query } as source_alias
ON {join condition to join s and t}
WHEN MATCHED [ AND condition ]
THEN DELETE / UPDATE
WHEN NOT MATCHED [ AND condition ]
THEN INSERT
MERGE
supports an arbitrary number ofWHEN
clauses with differentMATCHED
conditions. The condition clauses execute theDELETE
,UPDATE
orINSERT
operation in the firstWHEN
clause selected by theMATCHED
state and the match condition.- If a source row is not matched by any
WHEN
clause and there is noWHEN NOT MATCHED
clause, the source row is ignored.
Here is a full example:
MERGE INTO accounts t USING monthly_accounts_update s
ON (t.customer = s.customer)
-- deletes any matching target row for which the source address is Centreville.
WHEN MATCHED AND s.address = 'Centreville'
THEN DELETE
WHEN MATCHED
THEN UPDATE
SET purchases = s.purchases + t.purchases, address = s.address
WHEN NOT MATCHED
THEN INSERT (customer, purchases, address)
VALUES(s.customer, s.purchases, s.address)
Conclusion
One of the main reasons data teams keep an expensive Warehouse solution is to have row level transactions. With the emergence of table formats like Iceberg, open data lakes are much more capable and now we can rethink the data stack to be more cost-effective.