Archive

Archive for June, 2021

Spark – Cannot perform Merge as multiple source rows matched…

June 18, 2021 1 comment

 

In SQL when you are syncing a table (target) from an another table (source) you need to make sure there are no duplicates or repeated datasets in either of the Source or Target tables, otherwise you get following error:

UnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. Please refer to https://docs.microsoft.com/azure/databricks/delta/delta-update#upsert-into-a-table-using-merge

The above error says that while doing MERGE operation on the Target table there shouldn’t be any duplicates in the Source table. This check is applied implicitly by the SQL engine to avoid unnecessary updates and avoid inconsistent data.

So, to avoid this issue make sure you have de-duplication logic before the MERGE operation.

 

Below is a small demo to reproduce this error.

Let’s create two sample tables (Source & Target) for our demo purpose:

val df1 = Seq((1, "Brock", 30), 
              (2, "John",  31), 
              (2, "Andy",  35), //duplicate ID = 2
              (3, "Jane",  25), 
              (4, "Maria", 30)).toDF("Id", "name", "age")

spark.sql("drop table if exists tblPerson")
df1.write.format("delta").saveAsTable("tblPerson")


val df2 = Seq((1, "Jane", 30),
              (2, "John", 31)).toDF("Id", "name", "age")

spark.sql("drop table if exists tblPersonTarget")
df2.write.format("delta").saveAsTable("tblPersonTarget")

 

Next we will try to MERGE the tables and running the query will result in an error:

val mergeQuery =
s"""MERGE INTO tblPersonTarget As tgt
Using tblPerson as src      
  ON src.Id = tgt.ID
WHEN MATCHED 
  THEN UPDATE 
  SET
    tgt.name = src.name,
    tgt.age = src.age
WHEN NOT MATCHED
  THEN INSERT (
    ID,
    name,
    age
  )
  VALUES (
    src.ID,
    src.name,
    src.age
  )"""

spark.sql(mergeQuery)

 

To remove duplicates you can simply try removing by using window functions or some logic as per your business requirement:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = df1.withColumn("rn", row_number().over(window.partitionBy("Id").orderBy("name")))
val df3 = df2.filter("rn = 1")

display(df3)