Delta Lake

Mageswaran D
3 min readMar 17, 2020

Databricks Delta Tables — Upsert — Merge Error

Well we are all aware of the Delta library from Databricks right? most of us have started to use them as part our projects or as proof of concepts in our data pipe lines. Don’t you feel it is fun and cool to use parquet files to track table modifications? that too with bunch of Json files as meta data (_delta_log folder)

As developer this sounds great when it gets integrated with Spark seamlessly and can help us to leverage some design here and there in our pipelines, making use of Delta upserts and time travel (simply put, use previous snapshots based on time stamp) among other stuff.

In this post let me walk through the Upsert with a simple dataset in a notebook and a common error that you might hit sooner or later!

  • Insert: Always insert the data into the table.
  • Update: Always update the records on specified column(s) based on a condition.
  • Upsert: Dynamically chooses Insert or Updates based on merge condition.

Well explained by Databricks :

All good, as long as you are blessed!

If you unlucky like me, you will hit this error :

java.lang.UnsupportedOperationException: Cannot perform MERGE as multiple source rows matched and attempted to update the same

At first it may seem puzzling, but as you dig more the problem pops out, which is the age old problem, DUPLICATE RECORDS!

The next question is where is the duplicates ? or more precisely which table can afford to have duplicates and which one not ?

To find out the answer, I have put up a small notebook in Databricks community edition @

I have created a python function to do upsert operation as follows:

def upsert(df, path=DELTA_STORE, is_delete=False):
"""
Stores the Dataframe as Delta table if the path is empty or tries to merge the data if found
df : Dataframe
path : Delta table store path
is_delete: Delete the path directory
"""
if is_delete:
dbutils.fs.rm(path, True)
if os.path.exists("/dbfs/" + path):
print("Modifying existing table...")
delta_table = DeltaTable.forPath(spark, "dbfs:" + DELTA_STORE)
match_expr = "delta.{} = updates.{}".format("id", "id")
delta_table.alias("delta").merge(
df.alias("updates"), match_expr) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

else:
print("Creating new Delta table")
df.write.format("delta").save(DELTA_STORE)

And then tried couple of experimentation:

  • Create a dataframe called compacted_df / raw data dataframe
  • Use the compacted_df to create a Delta table
  • Create a fresh ingestion dataframe and add it to the Delta table
  • Update existing records and insert it to Delta table
  • Add two records from existing id with out any modification and with without duplicates

All goes well and good, so far…

What if I have duplicates in the original Delta table?

Answer is, it gets updated without hesitation!

Ok then, what about me getting duplicates as part of ingestion / new data ?

Boom!

Check out the note book if you wanted to play around the APIs and data!

Summary :

Don’t bring data/dataframe with duplicates and try to merge with the Delta table.

If we get a ignore clause for duplicates from Delta team, then this might get mitigated without an error, if developer wants to do so.

I wish the Delta Lake could be this one… ;)

--

--