Archive

Posts Tagged ‘Cosmos DB’

Cosmos DB & PySpark – Retrieve all attributes from all Collections under all Databases

April 12, 2021 Leave a comment

 

In one of my [previous post] we saw how to retrieve all attributes from the items (JSON document) of all Collections under all Databases by using C# .net code.

Here in this post we will see how we can retrieve the same information in Azure Databricks environment by using Python language instead of C# .net code.

 

So first of all you need to make sure that you have the Azure Cosmos DB SQL API library installed in your Databricks cluster. [Link if not done]

Then use the below script which:

1. First connects to Cosmos DB by using the CosmosClient() method.
2. Then it gets list of all Databases by using list_databases() method
3. Then iterate thru all databases and get list of all Containers by using list_containers() method.
4. Now again iterating thru all Containers and querying the items using the query_items() method.
5. The “metadataInfo” dictionary object is storing all the Keys & Values present in the Container item.
6. Then the List object with name “metadataList” stores all the Database, Container & Item level details stored in “metadataInfo” dictionary.

6. Finally we used the “metadataList” object to create a DataFrame by using createDataFrame() method.

Get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import azure.cosmos.exceptions as exceptions
import azure.cosmos.http_constants as http_constants
import json

cosmosUri = "https://YourCosmosDBName.documents.azure.com:443/"
pKey = "PrimaryKey=="

client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey})

cosmosDBsList = client.list_databases()

#Create a list to store the metadata
metadataList = []

#Iterate over all DBs
for eachCosmosDBsList in cosmosDBsList:
  #print("nDatabase Name: {}".format(eachCosmosDBsList['id']))
  dbClient = client.get_database_client(eachCosmosDBsList['id'])
  
  #Iterate over all Containers
  for containersList in dbClient.list_containers():
    #print("n- Container Name: {}".format(containersList['id']))
    conClient = dbClient.get_container_client(containersList['id'])
    
    #Query Container and read just TOP 1 row
    for queryItems in conClient.query_items("select top 1 * from c", 
                                            enable_cross_partition_query=True):
      for itemKey, itemValue in queryItems.items():
        #print(itemKey, " = ", itemValue)
        
        #Create a dictionary to store metedata info at attribute/field level
        metadataInfo = {}
        metadataInfo["Source"] = eachCosmosDBsList['id']
        metadataInfo["Entity"] = containersList['id']
        metadataInfo["Attribute"] = itemKey
        metadataInfo["Value"] = itemValue

        metadataList.append(metadataInfo)

#print(metadataList)

from pyspark.sql.types import *

mySchema = StructType([ StructField("Source", StringType(), True)
                       ,StructField("Entity", StringType(), True)
                       ,StructField("Attribute", StringType(), True)
                       ,StructField("Value", StringType(), True)])

df = spark.createDataFrame(metadataList, schema=mySchema)

df.createOrReplaceTempView("metadataDF")

display(df)

Advertisement
Categories: Cosmos DB, Python Tags: , ,

Using Python in Azure Databricks with Cosmos DB – DDL & DML operations by using “Azure-Cosmos” library for Python

April 9, 2021 1 comment

 

In one of my [previous post] we saw how to connect to Cosmos DB from Databricks by using the Apache Spark to Azure Cosmos DB connector. But that connector is limited to read and write data in Cosmos DB from Databricks compute using Scala language.

Here in this post we will see how can we do more in terms of managing the whole Cosmos DB databases, containers/collections and the items (JSON documents) from Databricks by using the Azure Cosmos DB SQL API SDK for Python.

 

Here we will perform some DDL & DML operations on Cosmos DB such as:

– Creating a new Database
– Creating a new Container
– Inserting new items
– Read items from Container
– Upserting/Updating items in Container
– Deleting items from Container
– Finally deleting the Container and Database

 

So first go to your Azure Databricks cluster, Libraries tab, click on Install New, on the popup select PyPI, and type “azure-cosmos” under Package text box, finally click the Install button. This will install the Azure Cosmos DB SQL API library and will show up in the Libraries tab.

Databricks Cosmos Python

 

Use the below sample code to import the required libraries and establish connection with Cosmos DB. You need to get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:

import azure.cosmos.cosmos_client as cosmos_client
from azure.cosmos import CosmosClient, PartitionKey, exceptions

cosmosUri = 'https://YourCosmosDBName.documents.azure.com:443/'
pKey = 'MasterPrimaryKey'

client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey})


# 1. Create a new Database:
newDatabaseName = 'ManojDB'

newDatabase = client.create_database(newDatabaseName)
print('\n1. Database created with name: ', newDatabase.id)


# 2. Get Database properties
dbClient = client.get_database_client(newDatabaseName)

dbProperties = dbClient.read()
print('\n2. DB Properties: ', dbProperties)


# 3. Create a new Container:
newContainerName = 'ManojContainer'

newContainer = dbClient.create_container(id=newContainerName, 
                                         partition_key=PartitionKey(path="/id"))
print('\n3. Container created with name: ', newContainer.id)


# 4. Create items in the Container:
containerClient = dbClient.get_container_client(newContainerName)

item1 = {'id' : '101', 'empId': 101, 
         'empFirstName': 'Manoj', 'empLastName': 'Pandey'}
containerClient.create_item(item1)

item2 = {'id' : '102', 'empId': 102, 
         'empFirstName': 'Saurabh', 'empLastName': 'Sharma'}
containerClient.create_item(item2)

item3 = {'id' : '103', 'empId': 103, 
         'empFirstName': 'Hitesh', 'empLastName': 'Kumar'}
containerClient.create_item(item3)

print('\n4. Inserted 3 items in ', newContainer.id)


# 5. Read items from Container:
print('\n5. Get all 3 items from Container:')

for items in containerClient.query_items(
        query='SELECT * FROM c',
        enable_cross_partition_query = True):
    print(items)

So till here we’ve created a Database & a Container in Cosmos DB, and inserted few items/records in it, as shown below:

CosmosDB Test

 

Now we will do some more DML operations like UPSERT/UPDATE & DELETE items from the collections:

# 6. Update/Upsert a item in Container:

updateItem = {'id' : '103', 'empId': 103, 
              'empFirstName': 'Hitesh', 'empLastName': 'Chouhan'}

containerClient.upsert_item(updateItem)

print('\n6. Updated LastName of EmpId = 103:')

for items in containerClient.query_items(
        query='SELECT * FROM c WHERE c.empId = 103',
        enable_cross_partition_query = True):
    print(items)


# 7. Delete an item from Container:

print('\n7. Delete item/record with EmpId = 103:')

for items in containerClient.query_items(
        query='SELECT * FROM c WHERE c.empId = 103',
        enable_cross_partition_query = True):
    containerClient.delete_item(items, partition_key='103')
    
for items in containerClient.query_items(
        query='SELECT * FROM c',
        enable_cross_partition_query = True):
    print(items)

 

Finally we will clean up all the stuff by deleting the Container and Databases that we created initially:

# 8. Delete Container

dbClient.delete_container(newContainer)

print('\n8. Deleted Container ', newContainer)


# 9. Delete Database

client.delete_database(newDatabaseName)

print('\n9. Deleted Database ', newDatabaseName)


Connect to Cosmos DB from Databricks and read data by using Apache Spark to Azure Cosmos DB connector

April 7, 2021 1 comment

 

In this post we will using Databricks compute environment to connect to Cosmos DB and read data by using Apache Spark to Azure Cosmos DB connector.

 

First go to your Azure Databricks cluster and import the Azure Cosmos DB connector library. Download the library JAR from either [Maven links] or the [Uber JAR] on your local PC drive and install the new library.

Databricks CosmosDB Library

 

Now open a new Notebook with language as scala and use the code provided below.

To get the Cosmos DB instance Uri and Key go to the Azure portal -> Cosmos DB instance, from Overview tab go to Keys tab and copy the “URI” & “PRIMARY READ-ONLY KEY” key values in code below.

import org.joda.time._  
import org.joda.time.format._  

import com.microsoft.azure.cosmosdb.spark.schema._  
import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark  
import com.microsoft.azure.cosmosdb.spark.config.Config  

import org.apache.spark.sql.functions._

val readerConfig = Config(Map( 
  "Endpoint" -> "https://YourCosmosDBname.documents.azure.com:443/", 
  "Masterkey" -> "YourPrimaryKey==", 
  "Database" -> "DatabaseName", 
  "Collection" -> "CollectionName"
  "query_custom" -> "select * from c" //optional
))

val df = spark.sqlContext.read.cosmosDB(readerConfig)
display(df)

Cosmos DB – Retrieve all attributes/fields from the JSON document of all Collections (using .net)

April 6, 2021 1 comment

 

In our [previous post] we saw how to get COUNT of items/records from all Collections in all Databases of a CosmosDB instance by programmatically/dynamically iterating over all these objects.

 

Here in this post we will extend it and see how to retrieve all attributes/fields from the JSON document/items present in these Collections. (This is some work I’m doing for maintaining a data catalogue and data classification for privacy and compliance purpose.)

 

On Azure portal go to your Azure Cosmos DB instance, from Overview tab go to Keys tab and copy the “URI” & “PRIMARY READ-ONLY KEY” key values in code below.

We have discussed all the methods used below in our [previous post], please check here. As the Cosmos DB SQL container is a schema-agnostic and the items in a container can have arbitrary schemas, thus we will read the first JSON document/item from the Collection by using FirstOrDefault() method and Deserialize this JSON document Object to a Dictionary, and finally read all the attributes from this Dictionary Keys:

 

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace CosmosDBgetJSONAttributes
{
    class Program
    {
        static void Main(string[] args)
        {
            string EndPointUri = "https://YourCosmosDBname.documents.azure.com:443/";
            string PK = "YourPrimaryKey==";

            DocumentClient cosmosClient = new DocumentClient(new Uri(EndPointUri), PK);

            string sqlQuery = "SELECT * FROM d";
            var dbs = cosmosClient.CreateDatabaseQuery(sqlQuery).ToList();

            foreach (var db in dbs)
            {
                Console.WriteLine("Database: " + db.id);

                List<DocumentCollection> collections = cosmosClient.CreateDocumentCollectionQuery((String)db._self).ToList();

                foreach (var collection in collections)
                {
                    Console.WriteLine("  - Collection: " + collection.Id);

                    var docu = cosmosClient.CreateDocumentQuery(collection.SelfLink, $"SELECT TOP 1 * FROM c",
                                    new FeedOptions() { EnableCrossPartitionQuery = true, MaxItemCount = 1, })
                                    .AsEnumerable().FirstOrDefault();
                    if (docu != null)
                    {
                        var docuStr = Convert.ToString(docu);
                        var data = JsonConvert.DeserializeObject<Dictionary<string, string>>(docuStr);
                        foreach (var item in data.Keys)
                        {
                            Console.WriteLine("    - Attribute: " + item);
                        }
                    }
                    else
                    {
                        Console.WriteLine("    - Attribute: no data");
                    }
                }
                Console.WriteLine(Environment.NewLine);
            }
            Console.WriteLine("Finished reading all Collections attributes");
            Console.Read();
        }
    }
}

Cosmos DB – Get record COUNT from all Collections in all Databases (using .net)

April 1, 2021 2 comments

 

Here in this post we will use C# .net code (for beginners like me) to see how to:
1. Connect to a Cosmos DB instance
2. Get list of all Databases in a Cosmos DB
3. Iterate through all the Databases and get the list of all Collections (or Tables)
4. Get COUNT of all documents/items (or records) in these Collections

 

On Azure portal go to your Azure Cosmos DB instance, from Overview tab go to Keys tab and copy the “URI” & “PRIMARY READ-ONLY KEY” key values:

1. First we will use the Uri & Primary Key to connect to CosmosDB service by using DocumentClient() method.

2. Then we will form a simple query to retrieve all Databases, will use with CreateDatabaseQuery() method to retrieve list of all databases.

3. Now to get list of all Collections present in each Database we will iterate through each Database, use the CreateDocumentCollectionQuery() method using the Database instance with Self link property.

4. Now to get the Document link we will also iterate through each Collection by using CreateDocumentQuery() method.

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;

namespace CosmosTables
{
    class ReadCosmosDB
    {
        static void Main(string[] args)
        {
            string EndPointUri = "https://YourCosmosDBname.documents.azure.com:443/";
            string PK = "YourPrimaryKey==";

            DocumentClient cosmosClient = new DocumentClient(new Uri(EndPointUri), PK);

            string sqlQuery = "SELECT * FROM d";
            var dbs = cosmosClient.CreateDatabaseQuery(sqlQuery).ToList();

            foreach (var db in dbs)
            {
                Console.WriteLine("Database: " + db.id);
                Console.WriteLine("- Collections: "); 

                List<DocumentCollection> collections = cosmosClient.CreateDocumentCollectionQuery((String)db._self).ToList();

                foreach (var collection in collections)
                {

                    var count = cosmosClient.CreateDocumentQuery<int>(collection.SelfLink, $"SELECT value count(1) FROM c",
                                    new FeedOptions(){EnableCrossPartitionQuery = true, MaxItemCount = 1,}).AsEnumerable().First();

                    Console.WriteLine("  - " + collection.Id + $", Count: " + count);
                }
                Console.WriteLine(Environment.NewLine);
            }
            Console.WriteLine("Finished reading all DB Collections");
        }
    }
}