Archive

Archive for the ‘Microsoft Azure’ Category

Azure Data Factory (ADF) Pipeline failure – found more columns than expected column count (DelimitedTextMoreColumnsThanDefined)

July 29, 2020 3 comments

 
I was setting up an Azure Data Factory (ADF) to copy files from Azure Data Lake Storage Gen1 to Gen2, but while running the Pipeline it was failing with below error:

Operation on target Copy_sae failed: Failure happened on ‘Sink’ side.
ErrorCode=DelimitedTextMoreColumnsThanDefined,
‘Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,
Message=Error found when processing ‘Csv/Tsv Format Text’ source ‘0_2019_11_09_01_43_32.avro’ with row number 53: found more columns than expected column count 27.,
Source=Microsoft.DataTransfer.Common,’

 

After some research I figured out that its because I had not selected the “Binary Copy” option while creating the Copy Data activity (shown in image below).

Root Cause: If the files under a particular folder you are copying contains files having different schema like, variable number of columns, different delimiters, quote char settings, or some data issue, the ADF pipeline will end up running in this error.

So, for bulk copying or migrating your data from one Data Lake to another try choosing this option, so that ADF won’t open the files to read schema, but it just simply treats every file as binary and copy it to the other location.


 
Hope this helps !

Migrate ADLS Gen1 to Gen2


Databricks Notebook error: Your administrator has only allowed sql and scala commands on this cluster.

June 8, 2020 Leave a comment

 
So while creating a Python notebook and running it on my Databricks Cluster I observed following error:

Your administrator has only allowed sql and scala commands on this cluster. This execution contained at least one disallowed language.

 

Its obvious that the error is due to some restriction applied at Cluster level. So I went to the Cluster settings page and checked the Spark Config and found below key-value configuration settings:

spark.databricks.repl.allowedLanguages sql,scala

 

So if you want to run other languages like Python & R you can remove the entire line or restrict any language(s) then change is as per your needs.


Spark SQL – Beware of Implicit datatype conversions (TypeCoercion)

March 6, 2020 1 comment

 
While working on some data analysis I saw one Spark SQL query was not getting me expected results. The table had some good amount of data, I was filtering on a value but some records were missing. So, I checked online and found that Spark SQL works differently compared to SQL Server, in this case while comparing 2 different datatypes columns or variables.

–> I’m populating some test data to reproduce the scenario, for that I’m inserting 9 rows and storing decimal values as String, query below:

CREATE OR REPLACE TEMPORARY VIEW vwTestDataType as 
select * from values 
("row1", "2.0"), 
("row2", "1.5"), 
("row3", "1.0"), 
("row4", "0.8"), 
("row5", "0.6"), 
("row6", "0.4"), 
("row7", "0.2"), 
("row8", "0.0"),
("row9", null);

describe vwTestDataType;

col_name | data_type | comment
col1           | string         | null
col2           | string         | null

 

–> Now, I’ll create a similar query where I was observing the issue. The below query should return me 7 rows, but instead it returns just 3 rows.

select * from vwTestDataType where col2 > 0

Running above query in “SQL Server” throws below error for the same dataset:

Conversion failed when converting the varchar value ‘2.0’ to data type int.

 

–> Let’s check why Spark SQL query didn’t failed and why its behaving like this.

I will use EXPLAIN EXTENDED operator to know what’s happening with the query while creating the Logical Plan.

explain extended select * from vwTestDataType where col2 > 0

Here is the plan you can see that under Analyzed Logical Plan the column “col2” is getting implicitly typecasted to INT, as the comparison value is an INT type. Thus it is converting all 0.x values to 0 and filtering them out.

Plan

== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘col2 > 0)
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13284, col2#13285]
+- Filter (cast(col2#13285 as int) > 0)
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13284, col2#13285]
+- LocalRelation [col1#13284, col2#13285]

== Optimized Logical Plan ==
LocalRelation [col1#13284, col2#13285]

== Physical Plan ==
LocalTableScan [col1#13284, col2#13285]

 

–> Now to avoid this issue you must explicitly type cast the column and value to the exact datatype to get expected result. Like here we should convert the String column & value to Double, this way the query returns all 7 rows as expected:

select * from vwTestDataType where double(col2) > double(0)
--OR--select * from vwTestDataType where col2 > 0.0

Let’s again check the Logical Plan of the modified query by using EXPLAIN EXTENDED operator how it looks like:

explain extended select * from vwTestDataType where double(col2) > double(0)
--OR--explain extended select * from vwTestDataType where col2 > 0.0

plan
== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘double(‘col2) > ‘double(0))
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13213, col2#13214]
+- Filter (cast(col2#13214 as double) > cast(0 as double))
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13213, col2#13214]
+- LocalRelation [col1#13213, col2#13214]

== Optimized Logical Plan ==
LocalRelation [col1#13213, col2#13214]

== Physical Plan ==
LocalTableScan [col1#13213, col2#13214]

 

So while working with Spark SQL we should make sure there should not be such datatype conflicts, and moreover these type of issues should be handled in way beginning while modelling the tables with correct datatype.


Spark/Scala: Convert or flatten a JSON having Nested data with Struct/Array to columns (Question)

January 9, 2019 Leave a comment

 
The following JSON contains some attributes at root level, like ProductNum and unitCount.
It also contains a Nested attribute with name “Properties”, which contains an array of Key-Value pairs.

Now, what I want is to expand this JSON, and have all the attributes in form of columns, with additional columns for all the Keys in Nested array section, like in the “Expected Output” section below:

{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "unitCount":"3"
}

 

Expected output, as described above:

+-------------------------------------------------------+   
| ProductNum | invoice_id | job_id | sku_id | unitCount |  
+-------------------------------------------------------+   
| 6000078    | 923659     | 296160 | 312002 | 3         |  
+-------------------------------------------------------+

 

Solution:

val DS_Products = spark.createDataset("""{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "UnitCount":"3"
}""" :: Nil)

val DF_Products = spark.read.json(DS_Products)

val df_flatten = DF_Products
  .select($"*", explode($"Properties") as "SubContent")
  .drop($"Properties")

df_flatten.show()

val df_flatten_pivot = df_flatten
  .groupBy($"ProductNum",$"UnitCount")
  .pivot("SubContent.key")
  .agg(first("SubContent.value"))

df_flatten_pivot.show()

Output:

+----------+---------+--------------------+
|ProductNum|UnitCount|          SubContent|
+----------+---------+--------------------+
|   6000078|        3|[invoice_id, 923659]|
|   6000078|        3|    [job_id, 296160]|
|   6000078|        3|    [sku_id, 312002]|
+----------+---------+--------------------+

+----------+---------+----------+------+------+
|ProductNum|UnitCount|invoice_id|job_id|sku_id|
+----------+---------+----------+------+------+
|   6000078|        3|    923659|296160|312002|
+----------+---------+----------+------+------+

 

Powershell – Restart Azure VM and log off Users remotely

October 21, 2018 1 comment

 

1. Open RUN by pressing Windows + R keys, type powershell command and hit Enter.
 

2. Now on Powershell window first connect to the Azure VM that you want to remotely restart:

PS C:\Users\manojp> Enter-PSSession -ComputerName MyAzureVMName

[MyAzureVMName]: PS C:\Users\manojp\Documents>

 

3. Now try issuing the Restart command:

[MyAzureVMName]: PS C:\Users\manojp\Documents> Restart-Computer

Restart-Computer : Failed to restart the computer MyAzureVMName with the following error message: The system shutdown
cannot be initiated because there are other users logged on to the computer.
+ CategoryInfo : OperationStopped: (MyAzureVMName:String) [Restart-Computer], InvalidOperationException
+ FullyQualifiedErrorId : RestartcomputerFailed,Microsoft.PowerShell.Commands.RestartComputerCommand

So, this gave us error as few users are still logged in, thus can’t restart the VM.

 

4. Let’s check who all are logged in on this VM:

[MyAzureVMName]: PS C:\Users\manojp\Documents> quser

USERNAME    SESSIONNAME    ID    STATE    IDLE TIME    LOGON TIME
charlesl         rdp-tcp#0           2      Active    1:07            12/21/2018 08:26 AM

 

5. Let’s try kicking users out by specifying the ID which is “2”:

[MyAzureVMName]: PS C:\Users\manojp\Documents> logoff 2

 

6. We will check if that user is kicked out or anybody is still remaining:

[MyAzureVMName]: PS C:\Users\manojp\Documents> quser

quser : No User exists for *
+ CategoryInfo : NotSpecified: (No User exists for *:String) [], RemoteException
+ FullyQualifiedErrorId : NativeCommandError

 

7. Let’s finally restart the VM:

[MyAzureVMName]: PS C:\Users\manojp\Documents> Restart-Computer

PS>