Archive

Archive for the ‘Databricks’ Category

Using Python in Azure Databricks with Cosmos DB – DDL & DML operations by using “Azure-Cosmos” library for Python

April 9, 2021 1 comment

 

In one of my [previous post] we saw how to connect to Cosmos DB from Databricks by using the Apache Spark to Azure Cosmos DB connector. But that connector is limited to read and write data in Cosmos DB from Databricks compute using Scala language.

Here in this post we will see how can we do more in terms of managing the whole Cosmos DB databases, containers/collections and the items (JSON documents) from Databricks by using the Azure Cosmos DB SQL API SDK for Python.

 

Here we will perform some DDL & DML operations on Cosmos DB such as:

– Creating a new Database
– Creating a new Container
– Inserting new items
– Read items from Container
– Upserting/Updating items in Container
– Deleting items from Container
– Finally deleting the Container and Database

 

So first go to your Azure Databricks cluster, Libraries tab, click on Install New, on the popup select PyPI, and type “azure-cosmos” under Package text box, finally click the Install button. This will install the Azure Cosmos DB SQL API library and will show up in the Libraries tab.

Databricks Cosmos Python

 

Use the below sample code to import the required libraries and establish connection with Cosmos DB. You need to get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:

import azure.cosmos.cosmos_client as cosmos_client
from azure.cosmos import CosmosClient, PartitionKey, exceptions

cosmosUri = 'https://YourCosmosDBName.documents.azure.com:443/'
pKey = 'MasterPrimaryKey'

client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey})


# 1. Create a new Database:
newDatabaseName = 'ManojDB'

newDatabase = client.create_database(newDatabaseName)
print('\n1. Database created with name: ', newDatabase.id)


# 2. Get Database properties
dbClient = client.get_database_client(newDatabaseName)

dbProperties = dbClient.read()
print('\n2. DB Properties: ', dbProperties)


# 3. Create a new Container:
newContainerName = 'ManojContainer'

newContainer = dbClient.create_container(id=newContainerName, 
                                         partition_key=PartitionKey(path="/id"))
print('\n3. Container created with name: ', newContainer.id)


# 4. Create items in the Container:
containerClient = dbClient.get_container_client(newContainerName)

item1 = {'id' : '101', 'empId': 101, 
         'empFirstName': 'Manoj', 'empLastName': 'Pandey'}
containerClient.create_item(item1)

item2 = {'id' : '102', 'empId': 102, 
         'empFirstName': 'Saurabh', 'empLastName': 'Sharma'}
containerClient.create_item(item2)

item3 = {'id' : '103', 'empId': 103, 
         'empFirstName': 'Hitesh', 'empLastName': 'Kumar'}
containerClient.create_item(item3)

print('\n4. Inserted 3 items in ', newContainer.id)


# 5. Read items from Container:
print('\n5. Get all 3 items from Container:')

for items in containerClient.query_items(
        query='SELECT * FROM c',
        enable_cross_partition_query = True):
    print(items)

So till here we’ve created a Database & a Container in Cosmos DB, and inserted few items/records in it, as shown below:

CosmosDB Test

 

Now we will do some more DML operations like UPSERT/UPDATE & DELETE items from the collections:

# 6. Update/Upsert a item in Container:

updateItem = {'id' : '103', 'empId': 103, 
              'empFirstName': 'Hitesh', 'empLastName': 'Chouhan'}

containerClient.upsert_item(updateItem)

print('\n6. Updated LastName of EmpId = 103:')

for items in containerClient.query_items(
        query='SELECT * FROM c WHERE c.empId = 103',
        enable_cross_partition_query = True):
    print(items)


# 7. Delete an item from Container:

print('\n7. Delete item/record with EmpId = 103:')

for items in containerClient.query_items(
        query='SELECT * FROM c WHERE c.empId = 103',
        enable_cross_partition_query = True):
    containerClient.delete_item(items, partition_key='103')
    
for items in containerClient.query_items(
        query='SELECT * FROM c',
        enable_cross_partition_query = True):
    print(items)

 

Finally we will clean up all the stuff by deleting the Container and Databases that we created initially:

# 8. Delete Container

dbClient.delete_container(newContainer)

print('\n8. Deleted Container ', newContainer)


# 9. Delete Database

client.delete_database(newDatabaseName)

print('\n9. Deleted Database ', newDatabaseName)


Python error: while converting Pandas Dataframe or Python List to Spark Dataframe (Can not merge type)

April 8, 2021 Leave a comment

 

Data typecasting errors are common when you are working with different DataFrames across different languages, like here in this case I got datatype mixing error between Pandas & Spark dataframe:

import pandas as pd
pd_df = pd.DataFrame([(101, 'abc'), 
                      ('def', 201), 
                      ('xyz', 'pqr')], 
                     columns=['col1', 'col2'])

df = spark.createDataFrame(pd_df)
display(df)
TypeError:
field col1: Can not merge type <class 'pyspark.sql.types.longtype'> and 
<class 'pyspark.sql.types.stringtype'>

 

While converting the Pandas DataFrame to Spark DataFrame its throwing error as Spark is not able to infer correct data type for the columns due to mix type of data in columns.

In this case you just need to explicitly tell Spark to use a correct datatype by creating a new schema and using it in createDataFrame() definition shown below:

import pandas as pd
pd_df = pd.DataFrame([(101, 'abc'), 
                      ('def', 201), 
                      ('xyz', 'pqr')], 
                     columns=['col1', 'col2'])

from pyspark.sql.types import *
df_schema = StructType([StructField("col1", StringType(), True)\
                       ,StructField("col2", StringType(), True)])

df = spark.createDataFrame(pd_df, schema=df_schema)
display(df)

Connect to Cosmos DB from Databricks and read data by using Apache Spark to Azure Cosmos DB connector

April 7, 2021 1 comment

 

In this post we will using Databricks compute environment to connect to Cosmos DB and read data by using Apache Spark to Azure Cosmos DB connector.

 

First go to your Azure Databricks cluster and import the Azure Cosmos DB connector library. Download the library JAR from either [Maven links] or the [Uber JAR] on your local PC drive and install the new library.

Databricks CosmosDB Library

 

Now open a new Notebook with language as scala and use the code provided below.

To get the Cosmos DB instance Uri and Key go to the Azure portal -> Cosmos DB instance, from Overview tab go to Keys tab and copy the “URI” & “PRIMARY READ-ONLY KEY” key values in code below.

import org.joda.time._  
import org.joda.time.format._  

import com.microsoft.azure.cosmosdb.spark.schema._  
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark  
import com.microsoft.azure.cosmosdb.spark.config.Config  

import org.apache.spark.sql.functions._

val readerConfig = Config(Map( 
  "Endpoint" -> "https://YourCosmosDBname.documents.azure.com:443/", 
  "Masterkey" -> "YourPrimaryKey==", 
  "Database" -> "DatabaseName", 
  "Collection" -> "CollectionName"
  "query_custom" -> "select * from c" //optional
))

val df = spark.sqlContext.read.cosmosDB(readerConfig)
display(df)

Spark error – Parquet does not support decimal. See HIVE-6384

August 5, 2020 1 comment

 
I was creating a Hive table in Databricks Notebook from a Parquet file located in Azure Data Lake store by following command:

val df = spark.read.parquet(
 "abfss://adlsstore@MyStorageAccount.dfs.core.windows.net/x/y/z/*.parquet")

df.write.mode("overwrite").saveAsTable("tblOrderDetail")

But I was getting following error:

warning: there was one feature warning; re-run with -feature for details
java.lang.UnsupportedOperationException: Parquet does not support decimal. See HIVE-6384

 
As per the above error it relates to some Hive version conflict, so I tried checking the Hive version by running below command and found that it is pointing to an old version (0.13.0). This version of Hive metastore did not support the BINARY datatypes for parquet formatted files.

spark.conf.get("spark.sql.hive.metastore.version")


 

Also as per this Jira Task on HIVE-6384 the support for multiple datatypes was implemented for Parquet SerDe in Hive 1.2.0 version.

 
So to update the Hive metastore to the current version you just need to add below commands in the configuration of the cluster you are using.

Click on “Clusters” –> click “Edit” on the top –> expand “Advanced Options” –> under “Spark” tab and “Spark Config” box add the below two commands:

spark.sql.hive.metastore.version 1.2.1
spark.sql.hive.metastore.jars builtin

You just need to restart the cluster so that the new settings are in use.
 


 

Some similar errors:
– Parquet does not support date
– Parquet does not support timestamp


Databricks Notebook error: Your administrator has only allowed sql and scala commands on this cluster.

June 8, 2020 Leave a comment

 
So while creating a Python notebook and running it on my Databricks Cluster I observed following error:

Your administrator has only allowed sql and scala commands on this cluster. This execution contained at least one disallowed language.

 

Its obvious that the error is due to some restriction applied at Cluster level. So I went to the Cluster settings page and checked the Spark Config and found below key-value configuration settings:

spark.databricks.repl.allowedLanguages sql,scala

 

So if you want to run other languages like Python & R you can remove the entire line or restrict any language(s) then change is as per your needs.