Archive
Cosmos DB & PySpark – Retrieve all attributes from all Collections under all Databases
In one of my [previous post] we saw how to retrieve all attributes from the items (JSON document) of all Collections under all Databases by using C# .net code.
Here in this post we will see how we can retrieve the same information in Azure Databricks environment by using Python language instead of C# .net code.
So first of all you need to make sure that you have the Azure Cosmos DB SQL API library installed in your Databricks cluster. [Link if not done]
Then use the below script which:
1. First connects to Cosmos DB by using the CosmosClient() method.
2. Then it gets list of all Databases by using list_databases() method
3. Then iterate thru all databases and get list of all Containers by using list_containers() method.
4. Now again iterating thru all Containers and querying the items using the query_items() method.
5. The “metadataInfo” dictionary object is storing all the Keys & Values present in the Container item.
6. Then the List object with name “metadataList” stores all the Database, Container & Item level details stored in “metadataInfo” dictionary.
6. Finally we used the “metadataList” object to create a DataFrame by using createDataFrame() method.
Get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:
import azure.cosmos.cosmos_client as cosmos_client import azure.cosmos.errors as errors import azure.cosmos.exceptions as exceptions import azure.cosmos.http_constants as http_constants import json cosmosUri = "https://YourCosmosDBName.documents.azure.com:443/" pKey = "PrimaryKey==" client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey}) cosmosDBsList = client.list_databases() #Create a list to store the metadata metadataList = [] #Iterate over all DBs for eachCosmosDBsList in cosmosDBsList: #print("nDatabase Name: {}".format(eachCosmosDBsList['id'])) dbClient = client.get_database_client(eachCosmosDBsList['id']) #Iterate over all Containers for containersList in dbClient.list_containers(): #print("n- Container Name: {}".format(containersList['id'])) conClient = dbClient.get_container_client(containersList['id']) #Query Container and read just TOP 1 row for queryItems in conClient.query_items("select top 1 * from c", enable_cross_partition_query=True): for itemKey, itemValue in queryItems.items(): #print(itemKey, " = ", itemValue) #Create a dictionary to store metedata info at attribute/field level metadataInfo = {} metadataInfo["Source"] = eachCosmosDBsList['id'] metadataInfo["Entity"] = containersList['id'] metadataInfo["Attribute"] = itemKey metadataInfo["Value"] = itemValue metadataList.append(metadataInfo) #print(metadataList) from pyspark.sql.types import * mySchema = StructType([ StructField("Source", StringType(), True) ,StructField("Entity", StringType(), True) ,StructField("Attribute", StringType(), True) ,StructField("Value", StringType(), True)]) df = spark.createDataFrame(metadataList, schema=mySchema) df.createOrReplaceTempView("metadataDF") display(df)