Archive
Cosmos DB & PySpark – Retrieve all attributes from all Collections under all Databases
In one of my [previous post] we saw how to retrieve all attributes from the items (JSON document) of all Collections under all Databases by using C# .net code.
Here in this post we will see how we can retrieve the same information in Azure Databricks environment by using Python language instead of C# .net code.
So first of all you need to make sure that you have the Azure Cosmos DB SQL API library installed in your Databricks cluster. [Link if not done]
Then use the below script which:
1. First connects to Cosmos DB by using the CosmosClient() method.
2. Then it gets list of all Databases by using list_databases() method
3. Then iterate thru all databases and get list of all Containers by using list_containers() method.
4. Now again iterating thru all Containers and querying the items using the query_items() method.
5. The “metadataInfo” dictionary object is storing all the Keys & Values present in the Container item.
6. Then the List object with name “metadataList” stores all the Database, Container & Item level details stored in “metadataInfo” dictionary.
6. Finally we used the “metadataList” object to create a DataFrame by using createDataFrame() method.
Get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import azure.cosmos.exceptions as exceptions
import azure.cosmos.http_constants as http_constants
import json
cosmosUri = "https://YourCosmosDBName.documents.azure.com:443/"
pKey = "PrimaryKey=="
client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey})
cosmosDBsList = client.list_databases()
#Create a list to store the metadata
metadataList = []
#Iterate over all DBs
for eachCosmosDBsList in cosmosDBsList:
#print("nDatabase Name: {}".format(eachCosmosDBsList['id']))
dbClient = client.get_database_client(eachCosmosDBsList['id'])
#Iterate over all Containers
for containersList in dbClient.list_containers():
#print("n- Container Name: {}".format(containersList['id']))
conClient = dbClient.get_container_client(containersList['id'])
#Query Container and read just TOP 1 row
for queryItems in conClient.query_items("select top 1 * from c",
enable_cross_partition_query=True):
for itemKey, itemValue in queryItems.items():
#print(itemKey, " = ", itemValue)
#Create a dictionary to store metedata info at attribute/field level
metadataInfo = {}
metadataInfo["Source"] = eachCosmosDBsList['id']
metadataInfo["Entity"] = containersList['id']
metadataInfo["Attribute"] = itemKey
metadataInfo["Value"] = itemValue
metadataList.append(metadataInfo)
#print(metadataList)
from pyspark.sql.types import *
mySchema = StructType([ StructField("Source", StringType(), True)
,StructField("Entity", StringType(), True)
,StructField("Attribute", StringType(), True)
,StructField("Value", StringType(), True)])
df = spark.createDataFrame(metadataList, schema=mySchema)
df.createOrReplaceTempView("metadataDF")
display(df)




