Posts Tagged ‘JOIN Hints’

Apache Spark – new Features & Improvements in Spark 3.0

October 27, 2020 1 comment

With Spark 3.0 release (on June 2020) there are some major improvements over the previous releases, some of the main and exciting features for Spark SQL & Scala developers are AQE (Adaptive Query Execution), Dynamic Partition Pruning and other performance optimization and enhancements.

Below I’ve listed out these new features and enhancements all together in one page for better understanding and future reference.

1. Adaptive Query Execution (AQE)

– To process large and varying amount of data in an optimized way Spark engine makes use of its Catalyst optimizer framework. It is a Cost-Based optimizer which collects statistics from column data (like cardinality/row-count, distinct values, NULL values, min/max/avg values, etc.) and helps creating better and optimized Query Execution Plans.

– But very often at runtime due to stale Statistics query plans can go suboptimal by choosing incorrect Joins and improper partitions/reducers, thus resulting in long running queries.

– Here the AQE feature allows the optimizer to create Alternate Execution Plans at runtime which are more optimal based on the current runtime statistics of the underlying data.

–> Below are the 3 improvements in AQE:

   i. Coalesce Shuffle Partitions: it combines or coalesces adjacent small partitions into bigger partitions at runtime by looking at the shuffle file statistics, thus reducing the number of tasks to perform.

   ii. Switch Join Strategy: converts a Sort-Merge-Join to a Broadcast-Hash-Join when the optimizer finds one table size is smaller than the broadcast threshold.

   iii. Skew Join Optimization: Data Skew happens due to uneven distribution of data among partitions. Due to this in a JOIN query some partitions grow significantly bigger than the other partitions, and corresponding Tasks takes much longer time to finish than other smaller Tasks, this slows down the whole query performance. This feature reads the shuffle file statistics at runtime, detects this skew, and breaks the bigger partitions into smaller ones with the size of similar other smaller partitions, which are now optimal to be joined to the corresponding partition of the other table.

More details on: Databricks post on AQE | Spark JIRA 31412 | Baidu Case Study Video

2. Dynamic Partition Pruning

– In a Star schema while querying multiple Fact & Dimension tables with JOINs there are times when we apply filter on a Dimension table, but unnecessary data from the Fact tables is also scanned by the Spark query engine, resulting in slow query performance.

– This could have been avoided by Pruning such partitions at Fact tables side too, but this information is unknown to the Query engine at runtime.

– The idea is to make queries more performant by reducing the I/O operations so that only the required partitions are scanned, so developers tend to add similar filters manually at Fact tables side, this strategy is also known as Static Partition Pruning.

–> Now with the new feature of Dynamic Partition Pruning the filter at Dimension side is automatically pushed to the Fact table side (called Filter Pushdown) which Prunes more unwanted partitions, allowing Query engine to read only specific partitions and return results faster.

More details on: Databricks post & video on DPP | Spark JIRA 11150

3. JOIN Hints

– At times due to various reasons Spark query engine compiler is unable to make the best choice of what JOIN to choose, so developers can use appropriate JOIN hints to influence the optimizer to choose a better plan.

– In previous version of Spark i.e. 2.x only Broadcast Join hint was supported, but now with Spark 3.0 other Join hints are also supported, as follows:

   – Shuffle Sort Merge join (MERGE, SHUFFLE_MERGE, MERGEJOIN)
   – Shuffle Hash join (SHUFFLE_HASH)
   – Shuffle Nested Loop join (SHUFFLE_REPLICATE_NL)

-- Spark-SQL
SELECT /*+ SHUFFLE_MERGE(Employee) */ * FROM Employee E 
INNER JOIN Department D ON E.DeptID = D.DeptID;

-- Scala
val DF_shuffleMergeJoin = 

More details on: Apache Spark Docs on JOIN Hints

4. SQL new features and improvements

– EXPLAIN FORMATTED Header, Footer & Subqueries

– 35 new built-in Functions
   – sinh, cosh, tanh, asinh, acosh, atanh
   – any, every, some
   – bit_and, bit_or, bit_count, bit_xor
   – bool_and, bool_or
   – count_if
   – date_part
   – extract
   – forall
   – from_csv
   – make_date, make_interval, make_timestamp
   – map_entries, map_filter, map_zip_with
   – max_by, min_by
   – schema_of_csv, to_csv
   – transform_keys, transform_values
   – typeof
   – version
   – xxhash64

5. Other changes

– spark.sql.ansi.enabled = true: Force users to stop using the reserved keywords of ANSI SQL as identifiers.
– spark.sql.storeAssignmentPolicy = ANSI
– Compatible with Scala 2.12
– TRIM function argument order is reversed now
– Type coercion is performed per ANSI SQL standards when inserting new values into a column

For rest of other new features & enhancements please check Apache Spark 3.0 release notes.