Delta Lake — Upserting without Primary Key

Tekion
5 min readMay 30, 2023

At Tekion, we have a centralized data platform that houses all data relating to Tekion’s applications. This central data platform is the source of data for many important applications, such as Machine Learning, Advanced Analytics platforms, and reporting. All such data relating to the applications is ingested into the data platform with minimum latency to keep it up to date.

The crux of the data platform is its central Data Lakehouse (Data Lake + Data warehouse). We use open-source Delta Lake on S3 for the storage layer and Athena/Presto combination for serving the data. In the Lakehouse, the data is modeled into a data warehouse format of facts and dimensions, but everything is in a delta table. Spark jobs are run every 15 minutes to continuously ingest the application updates into these delta tables. This ingestion requires a spark to upsert the application updates into delta tables on S3.

As a delta lake provides upsert functionality through its merge command, the merge command expects a merge key. However, if our target table and stage table have more than one row for the merge key, the command throws an error. It means one cannot upsert a delta table if the table has more than one row for the key on which the merge happens.

Having a primary key is not always the best use case for many data models. Even in our data model, we have many tables in the data warehouse for which no single column can uniquely identify a row. However, we still want to upsert a column by deleting all the rows for the same column value and inserting a new set of rows for the same column value.

Consider the following example:

The target table is a delta table where the id column is primary, but it is not unique. For example, more than 1 row can have the same values. The Updates data frame is a spark data frame, which is delta data (latest updates) that needs to be upserted. Even the updated data frame can have more than one row with the same values for the id column

When we upsert the updates dataframe into the target table on the id column, the merge command fails. The reason for failure is that when the merge command tries to join the two dataframes, it results in four rows for id r2. This will make it ambiguous for the delta library to choose the correct rows.

The issue is not just related to delta lake. Hudi and Iceberg have the same issue. It is the ambiguity after cross-join that creates a problem in merging the dataframes.

At Tekion, we have produced a solution to address the issue:

The approach here is to emulate the “delete” and “insert” processes. We will take a copy of the updated dataframe and snapshot the newly updated data frame to remove duplicates. We also create a new column “mergeKey” which is the copy of the id column. We then merge this dataframe back to the original updated dataframe. As the mergeKey will have values only for the unique rows, we can join this column with the id column of the target table and perform “delete-insert” in the merge command.

The entire process is illustrated below:

  1. First, create a copy of updates dataframe and create a new column called mergeKey and copy the id column values into this. This will be named stagedUpdates.

2. Remove the duplicate rows from stagedUpdates dataframe by snapshotting on the mergeKey column.

3. Create the mergeKey column even in the updates dataframe with NULL value. Now merge the updates dataframe and stagedUpdates data frame by name.

4. Now that the mergeKey column in the update dataframe has only unique values, use this column to join with the id column of the target table. Then, delete all matched rows and insert all unmatched rows.

The stepwise process will be as follows:

  • Take distinct id values from the updates dataframe.
  • Delete all the rows in the target table.
  • Insert all the rows from the updates.

It is vital to add the condition “updatesDF.mergeKey == NULL”. This will prevent the insertion of duplicate records from the updates dataframe when the mergeKey does not match the id column of the target table.

Additionally, the target table will always have an extra column mergeKey, which can be removed or ignored.

The same solution can be applied to Apache Iceberg also. This is the generic way to emulate the delete-insert process in the merge command. We have made this functionality a common “util function”, which is used in most of the spark job pipelines. This solution helped us use Delta Lake in our data platform, where data models do not have a primary key. In the process, we saved the infrastructure cost of hosting data warehouse solutions.

About the Author

This blog is written by Adarsh Mudukaplur Nagaraj, Engineering Manager at Tekion Corp.

--

--

Tekion

Building seamless business application platforms on the cloud, at Tachyon speed! Want to know how we do it? Read on.