Posts Tagged ‘Apache Spark’

Apache Spark – RDD, DataFrames, Transformations (Narrow & Wide), Actions, Lazy Evaluation (Part 3)

October 23, 2020 Leave a comment


image credits: Databricks

RDD (Resilient Distributed Dataset)

Spark works on the concept of RDDs i.e. “Resilient Distributed Dataset”. It is an Immutable, Fault Tolerant collection of objects partitioned across several nodes. With the concept of lineage RDDs can rebuild a lost partition in case of any node failure.

– In Spark initial versions RDDs was the only way for users to interact with Spark with its low-level API that provides various Transformations and Actions.

– With Spark 2.x new DataFrames and DataSets were introduced which are also built on top of RDDs, but provide more high-level structured APIs and more benefits over RDDs.

– But at the Spark core ultimately all Spark computation operations and high-level DataFrames APIs are converted into low-level RDD based Scala bytecode, which are executed in Spark Executors.

–> RDDs can be created in various ways, like:

1. Reading a file from local file system:

val FileRDD ="/mnt/test/hello.txt")

2. from a local collection by using parallelize method”

val myCol = "My Name is Manoj Pandey".split(" ")
val myRDD = spark.sparkContext.parallelize(myCol, 2)

3. Transforming an existing RDD:

val myCol = "My Name is Manoj Pandey. I stay in Hyderabad".split(" ")
val myRDD = spark.sparkContext.parallelize(myCol, 2)
val myRDD2 = myRDD.filter(s => s.contains("a"))

–> One should only use RDDs if working with raw and unstructured data, or don’t worry about schema and optimization & performance benefits available with DataFrames.


Just like RDDs, DataFrames are also Immutable collection of objects distributed/partitioned across several nodes. But unlike RDD, a DataFrame is like a table in RDBMS organized into columns and rows, columns with specific schema and datatypes like integer, date, string, timestamp, etc.

– DataFrames also provides optimization & performance benefits with the help of Catalyst Optimizer.

– As mentioned above the Spark Catalyst Optimizer always converts a DataFrame to low-level RDD transformations.

1. A simple example to create a DataFrame by reading a CSV file:

val myDF = spark
.option("inferSchema", "true")
.option("header", "true")


2. Creating a DataFrame by using Seq collection and using toDF() method:

val myDF = Seq(("Male",   "Brock", 30), 
               ("Male",   "John",  31), 
               ("Male",   "Andy",  35), 
               ("Female", "Jane",  25), 
               ("Female", "Maria", 30)).toDF("gender", "name", "age")


3. Creating a new DataFrame from an existing DataFrame by using groupBy() method over it:

import org.apache.spark.sql.functions._

val myDF = Seq(("Male",   "Brock", 30), 
               ("Male",   "John",  31), 
               ("Male",   "Andy",  35), 
               ("Female", "Jane",  25), 
               ("Female", "Maria", 30)).toDF("gender", "name", "age")

val DFavg = myDF.groupBy("gender").agg(avg("age"))


4. Creating a DataFrame from an existing RDD:

val myCol = "My Name is Manoj Pandey".split(" ")
val myRDD = spark.sparkContext.parallelize(myCol, 2)
val myDF = myRDD.toDF()

5. DataFrames can also be converted to Tables or Views (temp-Tables) so that you can use Spark SQL queries instead of applying Scala Transformations:


select gender, AVG(age) as AVGAge 
from tblPerson 
group by gender



In Spark RDDs and DataFrames are immutable, so to perform several operations on the data present in a DataFrame, it is transformed to a new DataFrame without modifying the existing DataFrame.

–> There are two types of Transformations:

1. Narrow Transformations: applies on a single partition, for example: filter(), map(), contains() can operate in single partition and no data exchange happens here between partitions.

2. Wide Transformations: applies on a multiple partitions, for example: groupBy(), reduceBy(), orderBy() requires to read other partitions and exchange data between partitions which is called shuffle and Spark has to write data to disk.

Lazy Evaluation

Both the above Narrow & Wide Transformation types are lazy in nature, means that until and unless any action is performed over these transformations the execution of all these transformations is delayed and Lazily evaluated. Due to this delay the Spark execution engine gets a whole view of all the chained transformations and ample time to optimize your query.


As Transformations don’t execute anything on their own, so to execute the chain of Transformations Spark needs some Actions to perform and triggers the Transformations.

Some examples of Actions are: count(), collect(), show(), save(), etc. to perform different operations like: to collect data of objects, show calculated data in a console, and write data to a file or target data sources.

Apache Spark – main Components & Architecture (Part 2)

October 19, 2020 1 comment


1. Spark Driver:

– The Driver program can run various operations in parallel on a Spark cluster.

– It is responsible to communicate with the Cluster Manager for allocation of resources for launching Spark Executors.

– And in parallel it instantiates SparkSession for the Spark Application.

– The Driver program splits the Spark Application into one or more Spark Jobs, and each Job is transformed into a DAG (Directed Acyclic Graph, aka Spark execution plan). Each DAG internally has various Stages based upon different operations to perform, and finally each Stage gets divided into multiple Tasks such that each Task maps to a single partition of data.

– Once the Cluster Manager allocates resources, the Driver program works directly with the Executors by assigning them Tasks.

2. Spark Session:

– A SparkSession provides a single entry point to interact with all Spark functionalities and the underlying core Spark APIs.

– For every Spark Application you’ve to create a SparkSession explicitly, but if you are working from an Interactive Shell the Spark Driver instantiates it implicitly for you.

– The role of SparkSession is also to send Spark Tasks to the executors to run.

3. Cluster Manager:

– Its role is to manage and allocate resources for the cluster nodes on which your Spark application is running.

– It works for Spark Driver and provides information about available Executor nodes and schedule Spark Tasks on them.

– Currently Spark supports built-in standalone cluster manager, Hadoop YARM, Apache Mesos and Kubernetes.

4. Spark Executor:

– By now you would have known what are Executors.

– These executes Tasks for an Spark Application on a Worker Node and keep communication with the Spark Driver.

– An Executor is actually a JVM running on a Worker node.

Apache Spark – Introduction & main features (Part 1)

October 12, 2020 Leave a comment


Apache Spark is an open source, unified analytics engine, designed for distributed big data processing and machine learning.

Although Apache Hadoop was still there to cater for Big Data workloads, but its Map-Reduce (MR) framework had some inefficiencies and was hard to manage & administer.

– A typical Hadoop program involves multiple iterative MR jobs which loads data to disk and reads again in every iteration, thus involves latency and performance issues.

– Hadoop also provides SQL like ad-hoc querying capability thru Pig & Hive, but due to MR it involves writing/reading data to/from disks, which makes it much slower and gives a bad experience when you need quick data retrieval.

– Thus the creators of Hadoop MR at UC Berkeley lab took this problem as an opportunity and came up with a project called Spark, which was written in Scala and is 10-100 times faster compared to Hadoop MR.

The main features of Spark are:

1. Speed: It is much more faster in processing and crunching large scale data compared to traditional databases engines and MapReduce Hadoop jobs. With in-memory storage and computation of data it provides high performance batch & streaming processing. Not only this but features like DAG scheduler, Query Optimizer, Tungsten execution engine, etc. makes Spark blazingly fast.

2. Modularity & Generality: For different kind of workloads Spark provides multiple components in its core like SQL, Structured Streaming, MLlib for Machine Learning, and GraphX (all as separate modules but unified under one engine).

3. Ease of use (Polyglot): To write your apps Spark provides multiple programming languages of your choice which you are comfortable with, like Scala, SQL, Python, Java & R. It also provides n number of APIs & libraries for you to quickly build your apps without going through a steep learning curve.

4. Extensibility: With Spark you can read data from multiple heterogeneous sources like HDFS, HBase, Apache Hive, Azure Blob Storage, Amazon S3, Apache Kafka, Kinesis, Cassandra, MongoDB, etc. and other traditional databases (RDBMSs). Spark also supports reading various file formats, such as CSV, Text, JSON, Parquet, ORC, Avro, etc. and from RDBMS tables.

5. Resilient (Fault Tolerant): Spark works on the concept of RDDs i.e. “Resilient Distributed Dataset”. It is a Fault Tolerant collection of objects partitioned across several nodes. With the concept of lineage RDDs can rebuild a lost partition in case of any node failure.

6. Runs Anywhere: Spark runs on multiple platforms like standalone cluster manager, Hadoop Yarn, Apache Mesos and Kubernetes.

Spark error – Parquet does not support decimal. See HIVE-6384

August 5, 2020 1 comment

I was creating a Hive table in Databricks Notebook from a Parquet file located in Azure Data Lake store by following command:

val df =


But I was getting following error:

warning: there was one feature warning; re-run with -feature for details
java.lang.UnsupportedOperationException: Parquet does not support decimal. See HIVE-6384

As per the above error it relates to some Hive version conflict, so I tried checking the Hive version by running below command and found that it is pointing to an old version (0.13.0). This version of Hive metastore did not support the BINARY datatypes for parquet formatted files.



Also as per this Jira Task on HIVE-6384 the support for multiple datatypes was implemented for Parquet SerDe in Hive 1.2.0 version.

So to update the Hive metastore to the current version you just need to add below commands in the configuration of the cluster you are using.

Click on “Clusters” –> click “Edit” on the top –> expand “Advanced Options” –> under “Spark” tab and “Spark Config” box add the below two commands:

spark.sql.hive.metastore.version 1.2.1
spark.sql.hive.metastore.jars builtin

You just need to restart the cluster so that the new settings are in use.


Some similar errors:
– Parquet does not support date
– Parquet does not support timestamp

Spark SQL – Beware of Implicit datatype conversions (TypeCoercion)

March 6, 2020 1 comment

While working on some data analysis I saw one Spark SQL query was not getting me expected results. The table had some good amount of data, I was filtering on a value but some records were missing. So, I checked online and found that Spark SQL works differently compared to SQL Server, in this case while comparing 2 different datatypes columns or variables.

–> I’m populating some test data to reproduce the scenario, for that I’m inserting 9 rows and storing decimal values as String, query below:

select * from values 
("row1", "2.0"), 
("row2", "1.5"), 
("row3", "1.0"), 
("row4", "0.8"), 
("row5", "0.6"), 
("row6", "0.4"), 
("row7", "0.2"), 
("row8", "0.0"),
("row9", null);

describe vwTestDataType;

col_name | data_type | comment
col1           | string         | null
col2           | string         | null


–> Now, I’ll create a similar query where I was observing the issue. The below query should return me 7 rows, but instead it returns just 3 rows.

select * from vwTestDataType where col2 > 0

Running above query in “SQL Server” throws below error for the same dataset:

Conversion failed when converting the varchar value ‘2.0’ to data type int.


–> Let’s check why Spark SQL query didn’t failed and why its behaving like this.

I will use EXPLAIN EXTENDED operator to know what’s happening with the query while creating the Logical Plan.

explain extended select * from vwTestDataType where col2 > 0

Here is the plan you can see that under Analyzed Logical Plan the column “col2” is getting implicitly typecasted to INT, as the comparison value is an INT type. Thus it is converting all 0.x values to 0 and filtering them out.


== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘col2 > 0)
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13284, col2#13285]
+- Filter (cast(col2#13285 as int) > 0)
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13284, col2#13285]
+- LocalRelation [col1#13284, col2#13285]

== Optimized Logical Plan ==
LocalRelation [col1#13284, col2#13285]

== Physical Plan ==
LocalTableScan [col1#13284, col2#13285]


–> Now to avoid this issue you must explicitly type cast the column and value to the exact datatype to get expected result. Like here we should convert the String column & value to Double, this way the query returns all 7 rows as expected:

select * from vwTestDataType where double(col2) > double(0)
--OR--select * from vwTestDataType where col2 > 0.0

Let’s again check the Logical Plan of the modified query by using EXPLAIN EXTENDED operator how it looks like:

explain extended select * from vwTestDataType where double(col2) > double(0)
--OR--explain extended select * from vwTestDataType where col2 > 0.0

== Parsed Logical Plan ==
‘Project [*]
+- ‘Filter (‘double(‘col2) > ‘double(0))
+- ‘UnresolvedRelation `vwTestDataType`

== Analyzed Logical Plan ==
col1: string, col2: string
Project [col1#13213, col2#13214]
+- Filter (cast(col2#13214 as double) > cast(0 as double))
+- SubqueryAlias `vwtestdatatype`
+- Project [col1#13213, col2#13214]
+- LocalRelation [col1#13213, col2#13214]

== Optimized Logical Plan ==
LocalRelation [col1#13213, col2#13214]

== Physical Plan ==
LocalTableScan [col1#13213, col2#13214]


So while working with Spark SQL we should make sure there should not be such datatype conflicts, and moreover these type of issues should be handled in way beginning while modelling the tables with correct datatype.