Archive

Posts Tagged ‘Databricks’

Spark – Cannot perform Merge as multiple source rows matched…

June 18, 2021 1 comment

 

In SQL when you are syncing a table (target) from an another table (source) you need to make sure there are no duplicates or repeated datasets in either of the Source or Target tables, otherwise you get following error:

UnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. Please refer to https://docs.microsoft.com/azure/databricks/delta/delta-update#upsert-into-a-table-using-merge

The above error says that while doing MERGE operation on the Target table there shouldn’t be any duplicates in the Source table. This check is applied implicitly by the SQL engine to avoid unnecessary updates and avoid inconsistent data.

So, to avoid this issue make sure you have de-duplication logic before the MERGE operation.

 

Below is a small demo to reproduce this error.

Let’s create two sample tables (Source & Target) for our demo purpose:

val df1 = Seq((1, "Brock", 30), 
              (2, "John",  31), 
              (2, "Andy",  35), //duplicate ID = 2
              (3, "Jane",  25), 
              (4, "Maria", 30)).toDF("Id", "name", "age")

spark.sql("drop table if exists tblPerson")
df1.write.format("delta").saveAsTable("tblPerson")


val df2 = Seq((1, "Jane", 30),
              (2, "John", 31)).toDF("Id", "name", "age")

spark.sql("drop table if exists tblPersonTarget")
df2.write.format("delta").saveAsTable("tblPersonTarget")

 

Next we will try to MERGE the tables and running the query will result in an error:

val mergeQuery =
s"""MERGE INTO tblPersonTarget As tgt
Using tblPerson as src      
  ON src.Id = tgt.ID
WHEN MATCHED 
  THEN UPDATE 
  SET
    tgt.name = src.name,
    tgt.age = src.age
WHEN NOT MATCHED
  THEN INSERT (
    ID,
    name,
    age
  )
  VALUES (
    src.ID,
    src.name,
    src.age
  )"""

spark.sql(mergeQuery)

 

To remove duplicates you can simply try removing by using window functions or some logic as per your business requirement:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = df1.withColumn("rn", row_number().over(window.partitionBy("Id").orderBy("name")))
val df3 = df2.filter("rn = 1")

display(df3)

Python error: while converting Pandas Dataframe or Python List to Spark Dataframe (Can not merge type)

April 8, 2021 Leave a comment

 

Data typecasting errors are common when you are working with different DataFrames across different languages, like here in this case I got datatype mixing error between Pandas & Spark dataframe:

import pandas as pd
pd_df = pd.DataFrame([(101, 'abc'), 
                      ('def', 201), 
                      ('xyz', 'pqr')], 
                     columns=['col1', 'col2'])

df = spark.createDataFrame(pd_df)
display(df)
TypeError:
field col1: Can not merge type <class 'pyspark.sql.types.longtype'> and 
<class 'pyspark.sql.types.stringtype'>

 

While converting the Pandas DataFrame to Spark DataFrame its throwing error as Spark is not able to infer correct data type for the columns due to mix type of data in columns.

In this case you just need to explicitly tell Spark to use a correct datatype by creating a new schema and using it in createDataFrame() definition shown below:

import pandas as pd
pd_df = pd.DataFrame([(101, 'abc'), 
                      ('def', 201), 
                      ('xyz', 'pqr')], 
                     columns=['col1', 'col2'])

from pyspark.sql.types import *
df_schema = StructType([StructField("col1", StringType(), True)\
                       ,StructField("col2", StringType(), True)])

df = spark.createDataFrame(pd_df, schema=df_schema)
display(df)

Connect to Cosmos DB from Databricks and read data by using Apache Spark to Azure Cosmos DB connector

April 7, 2021 1 comment

 

In this post we will using Databricks compute environment to connect to Cosmos DB and read data by using Apache Spark to Azure Cosmos DB connector.

 

First go to your Azure Databricks cluster and import the Azure Cosmos DB connector library. Download the library JAR from either [Maven links] or the [Uber JAR] on your local PC drive and install the new library.

Databricks CosmosDB Library

 

Now open a new Notebook with language as scala and use the code provided below.

To get the Cosmos DB instance Uri and Key go to the Azure portal -> Cosmos DB instance, from Overview tab go to Keys tab and copy the “URI” & “PRIMARY READ-ONLY KEY” key values in code below.

import org.joda.time._  
import org.joda.time.format._  

import com.microsoft.azure.cosmosdb.spark.schema._  
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark  
import com.microsoft.azure.cosmosdb.spark.config.Config  

import org.apache.spark.sql.functions._

val readerConfig = Config(Map( 
  "Endpoint" -> "https://YourCosmosDBname.documents.azure.com:443/", 
  "Masterkey" -> "YourPrimaryKey==", 
  "Database" -> "DatabaseName", 
  "Collection" -> "CollectionName"
  "query_custom" -> "select * from c" //optional
))

val df = spark.sqlContext.read.cosmosDB(readerConfig)
display(df)

SQL Error – “SELECT TOP 100” throws error in SparkSQL – what’s the correct syntax?

January 23, 2020 Leave a comment

 
In SQL Server to get top-n rows from a table or dataset you just have to use “SELECT TOP” clause by specifying the number of rows you want to return, like in the below query.

But when I tried to use the same query in Spark SQL I got a syntax error, which meant that the TOP clause is not supported with SELECT statement.

%sql
Select TOP 100 * from SalesOrder

Error in SQL statement: ParseException:
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input ‘100’ expecting (line 1, pos 11)

== SQL ==
Select top 100 * from SalesOrder
———–^^^

 

As Spark SQL does not support TOP clause thus I tried to use the syntax of MySQL which is the “LIMIT” clause.

So I just removed “TOP 100” from the SELECT query and tried adding “LIMIT 100” clause at the end, it worked and gave expected results !!!

%sql
Select * from SalesOrder LIMIT 100

Spark/Scala: Convert or flatten a JSON having Nested data with Struct/Array to columns (Question)

January 9, 2019 Leave a comment

 
The following JSON contains some attributes at root level, like ProductNum and unitCount.
It also contains a Nested attribute with name “Properties”, which contains an array of Key-Value pairs.

Now, what I want is to expand this JSON, and have all the attributes in form of columns, with additional columns for all the Keys in Nested array section, like in the “Expected Output” section below:

{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "unitCount":"3"
}

 

Expected output, as described above:

+-------------------------------------------------------+   
| ProductNum | invoice_id | job_id | sku_id | unitCount |  
+-------------------------------------------------------+   
| 6000078    | 923659     | 296160 | 312002 | 3         |  
+-------------------------------------------------------+

 

Solution:

val DS_Products = spark.createDataset("""{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "UnitCount":"3"
}""" :: Nil)

val DF_Products = spark.read.json(DS_Products)

val df_flatten = DF_Products
  .select($"*", explode($"Properties") as "SubContent")
  .drop($"Properties")

df_flatten.show()

val df_flatten_pivot = df_flatten
  .groupBy($"ProductNum",$"UnitCount")
  .pivot("SubContent.key")
  .agg(first("SubContent.value"))

df_flatten_pivot.show()

Output:

+----------+---------+--------------------+
|ProductNum|UnitCount|          SubContent|
+----------+---------+--------------------+
|   6000078|        3|[invoice_id, 923659]|
|   6000078|        3|    [job_id, 296160]|
|   6000078|        3|    [sku_id, 312002]|
+----------+---------+--------------------+

+----------+---------+----------+------+------+
|ProductNum|UnitCount|invoice_id|job_id|sku_id|
+----------+---------+----------+------+------+
|   6000078|        3|    923659|296160|312002|
+----------+---------+----------+------+------+