Archive

Posts Tagged ‘Apache Spark’

Spark SQL – Beware of Implicit datatype conversions (TypeCoercion)

March 6, 2020 1 comment

 
While working on some data analysis I saw one Spark SQL query was not getting me expected results. The table had some good amount of data, I was filtering on a value but some records were missing. So, I checked online and found that Spark SQL works differently compared to SQL Server, in this case while comparing 2 different datatypes columns or variables.

–> I’m populating some test data to reproduce the scenario, for that I’m inserting 9 rows and storing decimal values as String, query below:

CREATE OR REPLACE TEMPORARY VIEW vwTestDataType as 
select * from values 
("row1", "2.0"), 
("row2", "1.5"), 
("row3", "1.0"), 
("row4", "0.8"), 
("row5", "0.6"), 
("row6", "0.4"), 
("row7", "0.2"), 
("row8", "0.0"),
("row9", null);

describe vwTestDataType;

col_name | data_type | comment
col1           | string         | null
col2           | string         | null

 

–> Now, I’ll create a similar query where I was observing the issue. The below query should return me 7 rows, but instead it returns just 3 rows.

select * from vwTestDataType where col2 > 0

Running above query in “SQL Server” throws below error for the same dataset:

Conversion failed when converting the varchar value ‘2.0’ to data type int.

 

–> Let’s check why Spark SQL query didn’t failed and why its behaving like this.

I will use EXPLAIN EXTENDED operator to know what’s happening with the query while creating the Logical Plan.

explain extended select * from vwTestDataType where col2 > 0

Here is the plan you can see that under Analyzed Logical Plan the column “col2” is getting implicitly typecasted to INT, as the comparison value is an INT type. Thus it is converting all 0.x values to 0 and filtering them out.

Plan

== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘col2 > 0)
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13284, col2#13285]
+- Filter (cast(col2#13285 as int) > 0)
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13284, col2#13285]
+- LocalRelation [col1#13284, col2#13285]

== Optimized Logical Plan ==
LocalRelation [col1#13284, col2#13285]

== Physical Plan ==
LocalTableScan [col1#13284, col2#13285]

 

–> Now to avoid this issue you must explicitly type cast the column and value to the exact datatype to get expected result. Like here we should convert the String column & value to Double, this way the query returns all 7 rows as expected:

select * from vwTestDataType where double(col2) > double(0)
--OR--select * from vwTestDataType where col2 > 0.0

Let’s again check the Logical Plan of the modified query by using EXPLAIN EXTENDED operator how it looks like:

explain extended select * from vwTestDataType where double(col2) > double(0)
--OR--explain extended select * from vwTestDataType where col2 > 0.0

plan
== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘double(‘col2) > ‘double(0))
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13213, col2#13214]
+- Filter (cast(col2#13214 as double) > cast(0 as double))
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13213, col2#13214]
+- LocalRelation [col1#13213, col2#13214]

== Optimized Logical Plan ==
LocalRelation [col1#13213, col2#13214]

== Physical Plan ==
LocalTableScan [col1#13213, col2#13214]

 

So while working with Spark SQL we should make sure there should not be such datatype conflicts, and moreover these type of issues should be handled in way beginning while modelling the tables with correct datatype.


SQL Error – “SELECT TOP 100” throws error in SparkSQL – what’s the correct syntax?

January 23, 2020 Leave a comment

 
In SQL Server to get top-n rows from a table or dataset you just have to use “SELECT TOP” clause by specifying the number of rows you want to return, like in the below query.

But when I tried to use the same query in Spark SQL I got a syntax error, which meant that the TOP clause is not supported with SELECT statement.

%sql
Select TOP 100 * from SalesOrder

Error in SQL statement: ParseException:
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input ‘100’ expecting (line 1, pos 11)

== SQL ==
Select top 100 * from SalesOrder
———–^^^

 

As Spark SQL does not support TOP clause thus I tried to use the syntax of MySQL which is the “LIMIT” clause.

So I just removed “TOP 100” from the SELECT query and tried adding “LIMIT 100” clause at the end, it worked and gave expected results !!!

%sql
Select * from SalesOrder LIMIT 100

Spark/Scala: Convert or flatten a JSON having Nested data with Struct/Array to columns (Question)

January 9, 2019 Leave a comment

 
The following JSON contains some attributes at root level, like ProductNum and unitCount.
It also contains a Nested attribute with name “Properties”, which contains an array of Key-Value pairs.

Now, what I want is to expand this JSON, and have all the attributes in form of columns, with additional columns for all the Keys in Nested array section, like in the “Expected Output” section below:

{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "unitCount":"3"
}

 

Expected output, as described above:

+-------------------------------------------------------+   
| ProductNum | invoice_id | job_id | sku_id | unitCount |  
+-------------------------------------------------------+   
| 6000078    | 923659     | 296160 | 312002 | 3         |  
+-------------------------------------------------------+

 

Solution:

val DS_Products = spark.createDataset("""{
   "ProductNum":"6000078",
   "Properties":[
      {
         "key":"invoice_id",
         "value":"923659"
      },
      {
         "key":"job_id",
         "value":"296160"
      },
      {
         "key":"sku_id",
         "value":"312002"
      }
   ],
   "UnitCount":"3"
}""" :: Nil)

val DF_Products = spark.read.json(DS_Products)

val df_flatten = DF_Products
  .select($"*", explode($"Properties") as "SubContent")
  .drop($"Properties")

df_flatten.show()

val df_flatten_pivot = df_flatten
  .groupBy($"ProductNum",$"UnitCount")
  .pivot("SubContent.key")
  .agg(first("SubContent.value"))

df_flatten_pivot.show()

Output:

+----------+---------+--------------------+
|ProductNum|UnitCount|          SubContent|
+----------+---------+--------------------+
|   6000078|        3|[invoice_id, 923659]|
|   6000078|        3|    [job_id, 296160]|
|   6000078|        3|    [sku_id, 312002]|
+----------+---------+--------------------+

+----------+---------+----------+------+------+
|ProductNum|UnitCount|invoice_id|job_id|sku_id|
+----------+---------+----------+------+------+
|   6000078|        3|    923659|296160|312002|
+----------+---------+----------+------+------+

 

Azure Databricks (a fully managed Apache Spark offering)

July 28, 2018 Leave a comment

 

Databricks Introduction:

Azure Databricks = Best of Databricks + Best of Azure

Azure Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform (PaaS).

It is a fast, easy-to-use, and collaborative Apache Spark–based analytics platform. Designed in collaboration with the creators of Apache Spark, it combines the best of Databricks and Azure to help you accelerate innovation with one-click set up, streamlined workflows, and an interactive workspace that enables collaboration among data scientists, data engineers, and business analysts. Because it’s an Azure service, you benefit from native integrations with other Azure services such as Power BI, SQL Data Warehouse, and Cosmos DB. You also get enterprise-grade Azure security, including Active Directory integration, compliance, and enterprise-grade SLAs.
 


 
–> With Databricks you can:
– Launch your new Spark environment with a single click.
– Integrate effortlessly with a wide variety of data stores.
– Use Databricks Notebooks to unify your processes and instantly deploy to production.
– Improve and scale your analytics with a high-performance processing engine optimized for the comprehensive, trusted Azure platform.
 

Learning Resources:

    Webinar recording on Azure Databricks

    My next blog in series