Using Python in Azure Databricks with Cosmos DB – DDL & DML operations by using “Azure-Cosmos” library for Python
In one of my [previous post] we saw how to connect to Cosmos DB from Databricks by using the Apache Spark to Azure Cosmos DB connector. But that connector is limited to read and write data in Cosmos DB from Databricks compute using Scala language.
Here in this post we will see how can we do more in terms of managing the whole Cosmos DB databases, containers/collections and the items (JSON documents) from Databricks by using the Azure Cosmos DB SQL API SDK for Python.
Here we will perform some DDL & DML operations on Cosmos DB such as:
– Creating a new Database
– Creating a new Container
– Inserting new items
– Read items from Container
– Upserting/Updating items in Container
– Deleting items from Container
– Finally deleting the Container and Database
So first go to your Azure Databricks cluster, Libraries tab, click on Install New, on the popup select PyPI, and type “azure-cosmos” under Package text box, finally click the Install button. This will install the Azure Cosmos DB SQL API library and will show up in the Libraries tab.
Use the below sample code to import the required libraries and establish connection with Cosmos DB. You need to get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:
import azure.cosmos.cosmos_client as cosmos_client from azure.cosmos import CosmosClient, PartitionKey, exceptions cosmosUri = 'https://YourCosmosDBName.documents.azure.com:443/' pKey = 'MasterPrimaryKey' client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey}) # 1. Create a new Database: newDatabaseName = 'ManojDB' newDatabase = client.create_database(newDatabaseName) print('\n1. Database created with name: ', newDatabase.id) # 2. Get Database properties dbClient = client.get_database_client(newDatabaseName) dbProperties = dbClient.read() print('\n2. DB Properties: ', dbProperties) # 3. Create a new Container: newContainerName = 'ManojContainer' newContainer = dbClient.create_container(id=newContainerName, partition_key=PartitionKey(path="/id")) print('\n3. Container created with name: ', newContainer.id) # 4. Create items in the Container: containerClient = dbClient.get_container_client(newContainerName) item1 = {'id' : '101', 'empId': 101, 'empFirstName': 'Manoj', 'empLastName': 'Pandey'} containerClient.create_item(item1) item2 = {'id' : '102', 'empId': 102, 'empFirstName': 'Saurabh', 'empLastName': 'Sharma'} containerClient.create_item(item2) item3 = {'id' : '103', 'empId': 103, 'empFirstName': 'Hitesh', 'empLastName': 'Kumar'} containerClient.create_item(item3) print('\n4. Inserted 3 items in ', newContainer.id) # 5. Read items from Container: print('\n5. Get all 3 items from Container:') for items in containerClient.query_items( query='SELECT * FROM c', enable_cross_partition_query = True): print(items)
So till here we’ve created a Database & a Container in Cosmos DB, and inserted few items/records in it, as shown below:
Now we will do some more DML operations like UPSERT/UPDATE & DELETE items from the collections:
# 6. Update/Upsert a item in Container: updateItem = {'id' : '103', 'empId': 103, 'empFirstName': 'Hitesh', 'empLastName': 'Chouhan'} containerClient.upsert_item(updateItem) print('\n6. Updated LastName of EmpId = 103:') for items in containerClient.query_items( query='SELECT * FROM c WHERE c.empId = 103', enable_cross_partition_query = True): print(items) # 7. Delete an item from Container: print('\n7. Delete item/record with EmpId = 103:') for items in containerClient.query_items( query='SELECT * FROM c WHERE c.empId = 103', enable_cross_partition_query = True): containerClient.delete_item(items, partition_key='103') for items in containerClient.query_items( query='SELECT * FROM c', enable_cross_partition_query = True): print(items)
Finally we will clean up all the stuff by deleting the Container and Databases that we created initially:
# 8. Delete Container dbClient.delete_container(newContainer) print('\n8. Deleted Container ', newContainer) # 9. Delete Database client.delete_database(newDatabaseName) print('\n9. Deleted Database ', newDatabaseName)
-
April 12, 2021 at 7:00 amCosmos DB & PySpark – Retrieve all attributes from all Collections under all Databases | SQL with Manoj