Archive

Posts Tagged ‘Databricks’

Python error: while converting Pandas Dataframe or Python List to Spark Dataframe (Can not merge type)

April 8, 2021 Leave a comment

 

Data typecasting errors are common when you are working with different DataFrames across different languages, like here in this case I got datatype mixing error between Pandas & Spark dataframe:

import pandas as pd
pd_df = pd.DataFrame([(101, 'abc'), 
                      ('def', 201), 
                      ('xyz', 'pqr')], 
                     columns=['col1', 'col2'])

df = spark.createDataFrame(pd_df)
display(df)
TypeError:
field col1: Can not merge type <class 'pyspark.sql.types.longtype'> and 
<class 'pyspark.sql.types.stringtype'>

 

While converting the Pandas DataFrame to Spark DataFrame its throwing error as Spark is not able to infer correct data type for the columns due to mix type of data in columns.

In this case you just need to explicitly tell Spark to use a correct datatype by creating a new schema and using it in createDataFrame() definition shown below:

import pandas as pd
pd_df = pd.DataFrame([(101, 'abc'), 
                      ('def', 201), 
                      ('xyz', 'pqr')], 
                     columns=['col1', 'col2'])

from pyspark.sql.types import *
df_schema = StructType([StructField("col1", StringType(), True)\
                       ,StructField("col2", StringType(), True)])

df = spark.createDataFrame(pd_df, schema=df_schema)
display(df)

Connect to Cosmos DB from Databricks and read data by using Apache Spark to Azure Cosmos DB connector

April 7, 2021 1 comment

 

In this post we will using Databricks compute environment to connect to Cosmos DB and read data by using Apache Spark to Azure Cosmos DB connector.

 

First go to your Azure Databricks cluster and import the Azure Cosmos DB connector library. Download the library JAR from either [Maven links] or the [Uber JAR] on your local PC drive and install the new library.

Databricks CosmosDB Library

 

Now open a new Notebook with language as scala and use the code provided below.

To get the Cosmos DB instance Uri and Key go to the Azure portal -> Cosmos DB instance, from Overview tab go to Keys tab and copy the “URI” & “PRIMARY READ-ONLY KEY” key values in code below.

import org.joda.time._  
import org.joda.time.format._  

import com.microsoft.azure.cosmosdb.spark.schema._  
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark  
import com.microsoft.azure.cosmosdb.spark.config.Config  

import org.apache.spark.sql.functions._

val readerConfig = Config(Map( 
  "Endpoint" -> "https://YourCosmosDBname.documents.azure.com:443/", 
  "Masterkey" -> "YourPrimaryKey==", 
  "Database" -> "DatabaseName", 
  "Collection" -> "CollectionName"
  "query_custom" -> "select * from c" //optional
))

val df = spark.sqlContext.read.cosmosDB(readerConfig)
display(df)

SQL Error – “SELECT TOP 100” throws error in SparkSQL – what’s the correct syntax?

January 23, 2020 Leave a comment

 
In SQL Server to get top-n rows from a table or dataset you just have to use “SELECT TOP” clause by specifying the number of rows you want to return, like in the below query.

But when I tried to use the same query in Spark SQL I got a syntax error, which meant that the TOP clause is not supported with SELECT statement.

%sql
Select TOP 100 * from SalesOrder

Error in SQL statement: ParseException:
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input ‘100’ expecting (line 1, pos 11)

== SQL ==
Select top 100 * from SalesOrder
———–^^^

 

As Spark SQL does not support TOP clause thus I tried to use the syntax of MySQL which is the “LIMIT” clause.

So I just removed “TOP 100” from the SELECT query and tried adding “LIMIT 100” clause at the end, it worked and gave expected results !!!

%sql
Select * from SalesOrder LIMIT 100

Spark/Scala: Convert or flatten a JSON having Nested data with Struct/Array to columns (Question)

January 9, 2019 Leave a comment

 
The following JSON contains some attributes at root level, like ProductNum and unitCount.
It also contains a Nested attribute with name “Properties”, which contains an array of Key-Value pairs.

Now, what I want is to expand this JSON, and have all the attributes in form of columns, with additional columns for all the Keys in Nested array section, like in the “Expected Output” section below:

{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "unitCount":"3"
}

 

Expected output, as described above:

+-------------------------------------------------------+   
| ProductNum | invoice_id | job_id | sku_id | unitCount |  
+-------------------------------------------------------+   
| 6000078    | 923659     | 296160 | 312002 | 3         |  
+-------------------------------------------------------+

 

Solution:

val DS_Products = spark.createDataset("""{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "UnitCount":"3"
}""" :: Nil)

val DF_Products = spark.read.json(DS_Products)

val df_flatten = DF_Products
  .select($"*", explode($"Properties") as "SubContent")
  .drop($"Properties")

df_flatten.show()

val df_flatten_pivot = df_flatten
  .groupBy($"ProductNum",$"UnitCount")
  .pivot("SubContent.key")
  .agg(first("SubContent.value"))

df_flatten_pivot.show()

Output:

+----------+---------+--------------------+
|ProductNum|UnitCount|          SubContent|
+----------+---------+--------------------+
|   6000078|        3|[invoice_id, 923659]|
|   6000078|        3|    [job_id, 296160]|
|   6000078|        3|    [sku_id, 312002]|
+----------+---------+--------------------+

+----------+---------+----------+------+------+
|ProductNum|UnitCount|invoice_id|job_id|sku_id|
+----------+---------+----------+------+------+
|   6000078|        3|    923659|296160|312002|
+----------+---------+----------+------+------+

 

Azure Databricks learning resources (documentation and videos)

August 7, 2018 1 comment

 

Databricks Introduction

What is Azure Databricks [Video]

Create Databricks workspace with Apache Spark cluster

Extract, Transform & Load (ETL) with Databricks

– Documentation:
   – Azure
   – Databricks
 

From Channel 9

1. Data Science using Azure Databricks and Apache Spark [Video]

2. Data ingestion, stream processing and sentiment analysis using Twitter [Video]

3. ETL with Azure Databricks using ADF [Video]

4. ADF new features & integration with Azure Databricks [Video]

5. Azure Databricks introduces R Studio Integration [Video]

6. Run Jars and Python scripts on Azure Databricks using ADF [Video]
 

From Microsoft Build Conf