Watch & Subscribe my SQL videos on YouTube | Join me on Facebook

SQL DBA – Integration Services evaluation period has expired

July 2, 2021 Leave a comment

 
I got an email from one SQL Server developer that he is not able to use import/export wizard and it is failing with below error:

TITLE: SQL Server Import and Export Wizard --------------------------------------

Data flow execution failed. Error 0xc0000033: 
{5CCE2348-8B9F-4FD0-9AFA-9EA6D19576A7}: Integration Services evaluation period has 
expired. Error 0xc0000033: {5CCE2348-8B9F-4FD0-9AFA-9EA6D19576A7}: Integration 
Services evaluation period has expired. ------------------------------ 

ADDITIONAL INFORMATION:  Integration Services evaluation period has expired.  
({5CCE2348-8B9F-4FD0-9AFA-9EA6D19576A7}) ----------------------------------------

 

–> Investigate:

As per the above error message its clear that the SQL Server Instance that you had installed was under Evaluation of 180 days, because you didn’t applied any Product Key. So, now how can you make it usable again? All you need is a Product key of SQL Server and installation media to start an upgrade so that you can apply the new Product Key there.
 

–> Fix:

1. Open the SQL Server Installation Center and click on Maintenance link, and then click on Edition Upgrade:

SQL Evaluation expiry 03
 

2. Now on the Upgrade window Click Next and you will reach the Product Key page, apply the Key and click Next:

SQL Evaluation expiry 04
 

3. On the Select Instance page, select the SQL Instance that you want to fix and Click next. It will take some time and finally you will see a final window and click Upgrade:

SQL Evaluation expiry 05
 

4. Finally you will see the successful window, click on Close button:

SQL Evaluation expiry 06
 

5. Now Restart the SQL Server Service for this Instance, and you will see it running fine.
 

–> Finally, go back to SSMS and now you can connect to the SQL Instance.


Spark – Cannot perform Merge as multiple source rows matched…

June 18, 2021 1 comment

 

In SQL when you are syncing a table (target) from an another table (source) you need to make sure there are no duplicates or repeated datasets in either of the Source or Target tables, otherwise you get following error:

UnsupportedOperationException: Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches. Please refer to https://docs.microsoft.com/azure/databricks/delta/delta-update#upsert-into-a-table-using-merge

The above error says that while doing MERGE operation on the Target table there shouldn’t be any duplicates in the Source table. This check is applied implicitly by the SQL engine to avoid unnecessary updates and avoid inconsistent data.

So, to avoid this issue make sure you have de-duplication logic before the MERGE operation.

 

Below is a small demo to reproduce this error.

Let’s create two sample tables (Source & Target) for our demo purpose:

val df1 = Seq((1, "Brock", 30), 
              (2, "John",  31), 
              (2, "Andy",  35), //duplicate ID = 2
              (3, "Jane",  25), 
              (4, "Maria", 30)).toDF("Id", "name", "age")

spark.sql("drop table if exists tblPerson")
df1.write.format("delta").saveAsTable("tblPerson")


val df2 = Seq((1, "Jane", 30),
              (2, "John", 31)).toDF("Id", "name", "age")

spark.sql("drop table if exists tblPersonTarget")
df2.write.format("delta").saveAsTable("tblPersonTarget")

 

Next we will try to MERGE the tables and running the query will result in an error:

val mergeQuery =
s"""MERGE INTO tblPersonTarget As tgt
Using tblPerson as src      
  ON src.Id = tgt.ID
WHEN MATCHED 
  THEN UPDATE 
  SET
    tgt.name = src.name,
    tgt.age = src.age
WHEN NOT MATCHED
  THEN INSERT (
    ID,
    name,
    age
  )
  VALUES (
    src.ID,
    src.name,
    src.age
  )"""

spark.sql(mergeQuery)

 

To remove duplicates you can simply try removing by using window functions or some logic as per your business requirement:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = df1.withColumn("rn", row_number().over(window.partitionBy("Id").orderBy("name")))
val df3 = df2.filter("rn = 1")

display(df3)

Python – Delete/remove unwanted rows from a DataFrame

April 15, 2021 Leave a comment

 

As you start using Python you will fall in love with it, as its very easy to solve problems by writing complex logic in very simple, short and quick way. Here we will see how to remove rows from a DataFrame based on an invalid List of items.

 

Let’s create a sample Pandas DataFrame for our demo purpose:

import pandas as pd

sampleData = {
  'CustId': list(range(101, 111)),
  'CustomerName': ['Cust'+str(x) for x in range(101, 111)]}

cdf = pd.DataFrame(sampleData)

invalidList = [102, 103, 104]

The above logic in line # 4 & 5 creates 10 records with CustID ranging from 101 to 110 and respective CustomerNames like Cust101, Cust102, etc.

 

In below code we will use isin() function to get us only the records present in Invalid list. So this will fetch us only 3 invalid records from the DataFrame:

df = cdf[cdf.CustId.isin(invalidList)]

df

 

And to get the records not present in InvalidList we just need to use the “~” sign to do reverse of what we did in above step using the isin() function. So this will fetch us other 7 valid records from the DataFrame:

df = cdf[~cdf.CustId.isin(invalidList)]

df

Categories: Python Tags: , ,

Python error – Length of passed values is 6, index implies 2 (while doing PIVOT with MultiIndex or multiple columns)

April 14, 2021 Leave a comment

 

As I’m new to Python and these days using it for some Data Analysis & Metadata handling purpose, and also being from SQL background here I’m trying to use as many analysis features I use with SQL, like Group By, Aggregate functions, Filtering, Pivot, etc.

 

Now I had this particular requirement to PIVOT some columns based on multi-index keys, or multiple columns as shown below:

Python PIVOT

But while using PIVOT function in Python I was getting this weird error that I was not able to understand, because this error was not coming with the use of single index column.

ValueError: Length of passed values is 6, index implies 2.

 

Let’s create a sample Pandas DataFrame for our demo purpose:

import pandas as pd

sampleData = {
  'Country': ['India', 'India', 'India','India','USA','USA'],
  'State': ['UP','UP','TS','TS','CO','CO'],
  'CaseStatus': ['Active','Closed','Active','Closed','Active','Closed'],
  'Cases': [100, 150, 300, 400, 200, 300]}

df = pd.DataFrame(sampleData)

df

 

Problem/Issue:

But when I tried using the below code which uses pivot() function with multiple columns or multi-indexes it started throwing error. I was not getting error when I used single index/column below. So what could be the reason?

# Passing multi-index (or multiple columns) fails here:
df_pivot = df.pivot(index = ['Country','State'], 
                    columns = 'CaseStatus', 
                    values = 'Cases').reset_index()

df_pivot

 

Solution #1

Online checking I found that the pivot() function only accepts single column index key (do not accept multiple columns list as index). So, in this case first we would need to use set_index() function and set the list of columns as shown below:

# Use pivot() function with set_index() function:
df_pivot = (df.set_index(['Country', 'State'])
        .pivot(columns='CaseStatus')['Cases']
        .reset_index()
     )

df_pivot

 

Solution #2

There is one more simple option where you can use pivot_table() function as shown below and get the desired output:

# or use pivot_table() function:
df_pivot = pd.pivot_table(df, 
                          index = ['Country', 'State'], 
                          columns = 'CaseStatus', 
                          values = 'Cases')

df_pivot

Categories: Python Tags: , , ,

Cosmos DB & PySpark – Retrieve all attributes from all Collections under all Databases

April 12, 2021 Leave a comment

 

In one of my [previous post] we saw how to retrieve all attributes from the items (JSON document) of all Collections under all Databases by using C# .net code.

Here in this post we will see how we can retrieve the same information in Azure Databricks environment by using Python language instead of C# .net code.

 

So first of all you need to make sure that you have the Azure Cosmos DB SQL API library installed in your Databricks cluster. [Link if not done]

Then use the below script which:

1. First connects to Cosmos DB by using the CosmosClient() method.
2. Then it gets list of all Databases by using list_databases() method
3. Then iterate thru all databases and get list of all Containers by using list_containers() method.
4. Now again iterating thru all Containers and querying the items using the query_items() method.
5. The “metadataInfo” dictionary object is storing all the Keys & Values present in the Container item.
6. Then the List object with name “metadataList” stores all the Database, Container & Item level details stored in “metadataInfo” dictionary.

6. Finally we used the “metadataList” object to create a DataFrame by using createDataFrame() method.

Get the Cosmos Uri & Primary Key from the Cosmos DB Overview tab and apply in the code below:

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import azure.cosmos.exceptions as exceptions
import azure.cosmos.http_constants as http_constants
import json

cosmosUri = "https://YourCosmosDBName.documents.azure.com:443/"
pKey = "PrimaryKey=="

client = cosmos_client.CosmosClient(cosmosUri, {'masterKey': pKey})

cosmosDBsList = client.list_databases()

#Create a list to store the metadata
metadataList = []

#Iterate over all DBs
for eachCosmosDBsList in cosmosDBsList:
  #print("nDatabase Name: {}".format(eachCosmosDBsList['id']))
  dbClient = client.get_database_client(eachCosmosDBsList['id'])
  
  #Iterate over all Containers
  for containersList in dbClient.list_containers():
    #print("n- Container Name: {}".format(containersList['id']))
    conClient = dbClient.get_container_client(containersList['id'])
    
    #Query Container and read just TOP 1 row
    for queryItems in conClient.query_items("select top 1 * from c", 
                                            enable_cross_partition_query=True):
      for itemKey, itemValue in queryItems.items():
        #print(itemKey, " = ", itemValue)
        
        #Create a dictionary to store metedata info at attribute/field level
        metadataInfo = {}
        metadataInfo["Source"] = eachCosmosDBsList['id']
        metadataInfo["Entity"] = containersList['id']
        metadataInfo["Attribute"] = itemKey
        metadataInfo["Value"] = itemValue

        metadataList.append(metadataInfo)

#print(metadataList)

from pyspark.sql.types import *

mySchema = StructType([ StructField("Source", StringType(), True)
                       ,StructField("Entity", StringType(), True)
                       ,StructField("Attribute", StringType(), True)
                       ,StructField("Value", StringType(), True)])

df = spark.createDataFrame(metadataList, schema=mySchema)

df.createOrReplaceTempView("metadataDF")

display(df)

Categories: Cosmos DB, Python Tags: , ,