Archive

Posts Tagged ‘Azure Databricks’

Using Python in Azure Databricks with Cosmos DB – DDL & DML operations by using “Azure-Cosmos” library for Python

April 9, 2021 Leave a comment

 

In one of my [previous post] we saw how to connect to Cosmos DB from Databricks by using the Apache Spark to Azure Cosmos DB connector. But that connector is limited to read and write data in Cosmos DB from Databricks compute using Scala language.

Here in this post we will see how can we do more in terms of managing the whole Cosmos DB databases, containers/collections and the items (JSON documents) from Databricks by using the Azure Cosmos DB SQL API SDK for Python.

 

Here we will perform some DDL & DML operations on Cosmos DB such as:

– Creating a new Database
– Creating a new Container
– Inserting new items
– Read items from Container
– Upserting/Updating items in Container
– Deleting items from Container
– Finally deleting the Container and Database

 

So first go to your Azure Databricks cluster, Libraries tab, click on Install New, on the popup select PyPI, and type “azure-cosmos” under Package text box, finally click the Install button. This will install the Azure Cosmos DB SQL API library and will show up in the Libraries tab.

Databricks Cosmos Python

 

Use the below sample code to import the required libraries and establish connection with Cosmos DB. You need to get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:

import azure.cosmos.cosmos_client as cosmos_client
from azure.cosmos import CosmosClient, PartitionKey, exceptions

cosmosUri = 'https://YourCosmosDBName.documents.azure.com:443/'
pKey = 'MasterPrimaryKey'

client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey})


# 1. Create a new Database:
newDatabaseName = 'ManojDB'

newDatabase = client.create_database(newDatabaseName)
print('\n1. Database created with name: ', newDatabase.id)


# 2. Get Database properties
dbClient = client.get_database_client(newDatabaseName)

dbProperties = dbClient.read()
print('\n2. DB Properties: ', dbProperties)


# 3. Create a new Container:
newContainerName = 'ManojContainer'

newContainer = dbClient.create_container(id=newContainerName, 
                                         partition_key=PartitionKey(path="/id"))
print('\n3. Container created with name: ', newContainer.id)


# 4. Create items in the Container:
containerClient = dbClient.get_container_client(newContainerName)

item1 = {'id' : '101', 'empId': 101, 
         'empFirstName': 'Manoj', 'empLastName': 'Pandey'}
containerClient.create_item(item1)

item2 = {'id' : '102', 'empId': 102, 
         'empFirstName': 'Saurabh', 'empLastName': 'Sharma'}
containerClient.create_item(item2)

item3 = {'id' : '103', 'empId': 103, 
         'empFirstName': 'Hitesh', 'empLastName': 'Kumar'}
containerClient.create_item(item3)

print('\n4. Inserted 3 items in ', newContainer.id)


# 5. Read items from Container:
print('\n5. Get all 3 items from Container:')

for items in containerClient.query_items(
        query='SELECT * FROM c',
        enable_cross_partition_query = True):
    print(items)

So till here we’ve created a Database & a Container in Cosmos DB, and inserted few items/records in it, as shown below:

CosmosDB Test

 

Now we will do some more DML operations like UPSERT/UPDATE & DELETE items from the collections:

# 6. Update/Upsert a item in Container:

updateItem = {'id' : '103', 'empId': 103, 
              'empFirstName': 'Hitesh', 'empLastName': 'Chouhan'}

containerClient.upsert_item(updateItem)

print('\n6. Updated LastName of EmpId = 103:')

for items in containerClient.query_items(
        query='SELECT * FROM c WHERE c.empId = 103',
        enable_cross_partition_query = True):
    print(items)


# 7. Delete an item from Container:

print('\n7. Delete item/record with EmpId = 103:')

for items in containerClient.query_items(
        query='SELECT * FROM c WHERE c.empId = 103',
        enable_cross_partition_query = True):
    containerClient.delete_item(items, partition_key='103')
    
for items in containerClient.query_items(
        query='SELECT * FROM c',
        enable_cross_partition_query = True):
    print(items)

 

Finally we will clean up all the stuff by deleting the Container and Databases that we created initially:

# 8. Delete Container

dbClient.delete_container(newContainer)

print('\n8. Deleted Container ', newContainer)


# 9. Delete Database

client.delete_database(newDatabaseName)

print('\n9. Deleted Database ', newDatabaseName)


Databricks Notebook error: Your administrator has only allowed sql and scala commands on this cluster.

June 8, 2020 Leave a comment

 
So while creating a Python notebook and running it on my Databricks Cluster I observed following error:

Your administrator has only allowed sql and scala commands on this cluster. This execution contained at least one disallowed language.

 

Its obvious that the error is due to some restriction applied at Cluster level. So I went to the Cluster settings page and checked the Spark Config and found below key-value configuration settings:

spark.databricks.repl.allowedLanguages sql,scala

 

So if you want to run other languages like Python & R you can remove the entire line or restrict any language(s) then change is as per your needs.


Spark SQL – Beware of Implicit datatype conversions (TypeCoercion)

March 6, 2020 1 comment

 
While working on some data analysis I saw one Spark SQL query was not getting me expected results. The table had some good amount of data, I was filtering on a value but some records were missing. So, I checked online and found that Spark SQL works differently compared to SQL Server, in this case while comparing 2 different datatypes columns or variables.

–> I’m populating some test data to reproduce the scenario, for that I’m inserting 9 rows and storing decimal values as String, query below:

CREATE OR REPLACE TEMPORARY VIEW vwTestDataType as 
select * from values 
("row1", "2.0"), 
("row2", "1.5"), 
("row3", "1.0"), 
("row4", "0.8"), 
("row5", "0.6"), 
("row6", "0.4"), 
("row7", "0.2"), 
("row8", "0.0"),
("row9", null);

describe vwTestDataType;

col_name | data_type | comment
col1           | string         | null
col2           | string         | null

 

–> Now, I’ll create a similar query where I was observing the issue. The below query should return me 7 rows, but instead it returns just 3 rows.

select * from vwTestDataType where col2 > 0

Running above query in “SQL Server” throws below error for the same dataset:

Conversion failed when converting the varchar value ‘2.0’ to data type int.

 

–> Let’s check why Spark SQL query didn’t failed and why its behaving like this.

I will use EXPLAIN EXTENDED operator to know what’s happening with the query while creating the Logical Plan.

explain extended select * from vwTestDataType where col2 > 0

Here is the plan you can see that under Analyzed Logical Plan the column “col2” is getting implicitly typecasted to INT, as the comparison value is an INT type. Thus it is converting all 0.x values to 0 and filtering them out.

Plan

== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘col2 > 0)
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13284, col2#13285]
+- Filter (cast(col2#13285 as int) > 0)
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13284, col2#13285]
+- LocalRelation [col1#13284, col2#13285]

== Optimized Logical Plan ==
LocalRelation [col1#13284, col2#13285]

== Physical Plan ==
LocalTableScan [col1#13284, col2#13285]

 

–> Now to avoid this issue you must explicitly type cast the column and value to the exact datatype to get expected result. Like here we should convert the String column & value to Double, this way the query returns all 7 rows as expected:

select * from vwTestDataType where double(col2) > double(0)
--OR--select * from vwTestDataType where col2 > 0.0

Let’s again check the Logical Plan of the modified query by using EXPLAIN EXTENDED operator how it looks like:

explain extended select * from vwTestDataType where double(col2) > double(0)
--OR--explain extended select * from vwTestDataType where col2 > 0.0

plan
== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘double(‘col2) > ‘double(0))
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13213, col2#13214]
+- Filter (cast(col2#13214 as double) > cast(0 as double))
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13213, col2#13214]
+- LocalRelation [col1#13213, col2#13214]

== Optimized Logical Plan ==
LocalRelation [col1#13213, col2#13214]

== Physical Plan ==
LocalTableScan [col1#13213, col2#13214]

 

So while working with Spark SQL we should make sure there should not be such datatype conflicts, and moreover these type of issues should be handled in way beginning while modelling the tables with correct datatype.


Azure Databricks learning resources (documentation and videos)

August 7, 2018 1 comment

 

Databricks Introduction

What is Azure Databricks [Video]

Create Databricks workspace with Apache Spark cluster

Extract, Transform & Load (ETL) with Databricks

– Documentation:
   – Azure
   – Databricks
 

From Channel 9

1. Data Science using Azure Databricks and Apache Spark [Video]

2. Data ingestion, stream processing and sentiment analysis using Twitter [Video]

3. ETL with Azure Databricks using ADF [Video]

4. ADF new features & integration with Azure Databricks [Video]

5. Azure Databricks introduces R Studio Integration [Video]

6. Run Jars and Python scripts on Azure Databricks using ADF [Video]
 

From Microsoft Build Conf


 


Azure Databricks (a fully managed Apache Spark offering)

July 28, 2018 Leave a comment

 

Databricks Introduction:

Azure Databricks = Best of Databricks + Best of Azure

Azure Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform (PaaS).

It is a fast, easy-to-use, and collaborative Apache Spark–based analytics platform. Designed in collaboration with the creators of Apache Spark, it combines the best of Databricks and Azure to help you accelerate innovation with one-click set up, streamlined workflows, and an interactive workspace that enables collaboration among data scientists, data engineers, and business analysts. Because it’s an Azure service, you benefit from native integrations with other Azure services such as Power BI, SQL Data Warehouse, and Cosmos DB. You also get enterprise-grade Azure security, including Active Directory integration, compliance, and enterprise-grade SLAs.
 


 
–> With Databricks you can:
– Launch your new Spark environment with a single click.
– Integrate effortlessly with a wide variety of data stores.
– Use Databricks Notebooks to unify your processes and instantly deploy to production.
– Improve and scale your analytics with a high-performance processing engine optimized for the comprehensive, trusted Azure platform.
 

Learning Resources:

    Webinar recording on Azure Databricks

    My next blog in series