Archive
Spark – Cannot perform Merge as multiple source rows matched…
In SQL when you are syncing a table (target) from an another table (source) you need to make sure there are no duplicates or repeated datasets in either of the Source or Target tables, otherwise you get following error:
UnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. Please refer to https://docs.microsoft.com/azure/databricks/delta/delta-update#upsert-into-a-table-using-merge
The above error says that while doing MERGE operation on the Target table there shouldn’t be any duplicates in the Source table. This check is applied implicitly by the SQL engine to avoid unnecessary updates and avoid inconsistent data.
So, to avoid this issue make sure you have de-duplication logic before the MERGE operation.
Below is a small demo to reproduce this error.
Let’s create two sample tables (Source & Target) for our demo purpose:
val df1 = Seq((1, "Brock", 30), (2, "John", 31), (2, "Andy", 35), //duplicate ID = 2 (3, "Jane", 25), (4, "Maria", 30)).toDF("Id", "name", "age") spark.sql("drop table if exists tblPerson") df1.write.format("delta").saveAsTable("tblPerson") val df2 = Seq((1, "Jane", 30), (2, "John", 31)).toDF("Id", "name", "age") spark.sql("drop table if exists tblPersonTarget") df2.write.format("delta").saveAsTable("tblPersonTarget")
Next we will try to MERGE the tables and running the query will result in an error:
val mergeQuery = s"""MERGE INTO tblPersonTarget As tgt Using tblPerson as src ON src.Id = tgt.ID WHEN MATCHED THEN UPDATE SET tgt.name = src.name, tgt.age = src.age WHEN NOT MATCHED THEN INSERT ( ID, name, age ) VALUES ( src.ID, src.name, src.age )""" spark.sql(mergeQuery)
To remove duplicates you can simply try removing by using window functions or some logic as per your business requirement:
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val df2 = df1.withColumn("rn", row_number().over(window.partitionBy("Id").orderBy("name"))) val df3 = df2.filter("rn = 1") display(df3)
Spark/Scala: Convert or flatten a JSON having Nested data with Struct/Array to columns (Question)
The following JSON contains some attributes at root level, like ProductNum and unitCount.
It also contains a Nested attribute with name “Properties”, which contains an array of Key-Value pairs.
Now, what I want is to expand this JSON, and have all the attributes in form of columns, with additional columns for all the Keys in Nested array section, like in the “Expected Output” section below:
{ "ProductNum":"6000078", "Properties":[ { "key":"invoice_id", "value":"923659" }, { "key":"job_id", "value":"296160" }, { "key":"sku_id", "value":"312002" } ], "unitCount":"3" }
Expected output, as described above:
+-------------------------------------------------------+ | ProductNum | invoice_id | job_id | sku_id | unitCount | +-------------------------------------------------------+ | 6000078 | 923659 | 296160 | 312002 | 3 | +-------------------------------------------------------+
Solution:
val DS_Products = spark.createDataset("""{ "ProductNum":"6000078", "Properties":[ { "key":"invoice_id", "value":"923659" }, { "key":"job_id", "value":"296160" }, { "key":"sku_id", "value":"312002" } ], "UnitCount":"3" }""" :: Nil) val DF_Products = spark.read.json(DS_Products) val df_flatten = DF_Products .select($"*", explode($"Properties") as "SubContent") .drop($"Properties") df_flatten.show() val df_flatten_pivot = df_flatten .groupBy($"ProductNum",$"UnitCount") .pivot("SubContent.key") .agg(first("SubContent.value")) df_flatten_pivot.show()
Output:
+----------+---------+--------------------+ |ProductNum|UnitCount| SubContent| +----------+---------+--------------------+ | 6000078| 3|[invoice_id, 923659]| | 6000078| 3| [job_id, 296160]| | 6000078| 3| [sku_id, 312002]| +----------+---------+--------------------+ +----------+---------+----------+------+------+ |ProductNum|UnitCount|invoice_id|job_id|sku_id| +----------+---------+----------+------+------+ | 6000078| 3| 923659|296160|312002| +----------+---------+----------+------+------+