Boosting Spark Union Operator Performance: Optimization Tips for Improved Query Speed

Photo by Fahrul Azmi on Unsplash
Photo by Fahrul Azmi on Unsplash

The union operator is one of the set operators to merge two input data frames into one. Union is a convenient operation in Apache Spark for combining rows with the same order of columns. One frequently used case is applying different transformations and then unioning them together. 

The ways of using the union operation in Spark are often discussed widely. However, a hidden fact that has been less discussed is the performance caveat associated with the union operator. If we didn’t understand the caveat of the union operator in Spark, we might fall into the trap of doubling the execution time to get the result.

We will focus on the Apache Spark DataFrame union operator in this story with examples, show you the physical query plan, and share techniques for optimization in this story.

Union Operator 101 in Spark

Like Relational Database (RDBMS) SQL, the union is a direct way to combine rows. One important thing to note when dealing with a union operator is to ensure rows follow the same structure:

  • The number of columns should be identical. The union operation won’t silently work or fill with NULL when the number of columns differs on data frames. 
  • The column data type should match and resolves columns by position. The column name should follow the same sequence for each data frame. Nevertheless, that’s not mandatory. The first data frame will be chosen as the default for the column name. So mixing order can potentially cause an undesired result. Spark unionByName is intended to resolve this issue.

In Spark, the operation unionAll is an alias to union that doesn’t remove duplication. We’d need to add distinct after performing union to perform SQL-like union operations without duplication. 

We can also combine multiple data frames to produce a final data frame.

				
					df = df1.union(df2).union(df3)
				
			

Performance Bottleneck of Union Operator

One typical pattern of using the union operator is splitting a single data frame into multiple, then applying different transformations, and eventually combining them into the final one. 

Here is an example: we have two big tables (fact table) that need to join, and the best way to join is the SortMerged join in Spark. Once we got the SortMerged data frame, we split it into four subsets. Each subset uses different transformations, and eventually, we combine those 4 data frames into the final one. 

Union Operation in Spark | Image By Author
Union Operation in Spark | Image By Author

Spark data frame leverages Catalyst optimizer, which takes the data frame code you had, then performs code analysis, logical optimization, physical planning, and code generation. Catalyst tries to create an optimal plan that executes your Spark job efficiently. 

In recent years, Spark has extensively accomplished a lot of optimization on Catalyst to improve performance on Spark join operations. The join operation has more scenarios to use than the union operation, leading to less effort put into the union operation. 

If users don’t use union on entirely different data sources, union operators will face a potential performance bottleneck — Catalyst isn’t “smart” to identify the shared data frames to reuse. 

In this case, Spark will take each data frame as separate branches, then perform everything from the root multiple times. In our example, we will perform the two big table join four times! It is a huge bottleneck. 

Set up an Example with Union Operator in Spark

It’s straightforward to reproduce a non-optimized physical query plan for the union operator in Spark. We will do the following

  1. Create two data frames from 1 to 1000000. Let’s call them df1 and df2
  2. Perform inner join on df1 and df2
  3. Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
  4. Add a transformation with a field called magic_value , which is generated by two dummy transformations. 
  5. Union the odd and even number data frames
				
					## Create two data frames from 1 to 1000000. Let's call them df1 and df2
df1 = spark.createDataFrame([i for i in range(1000000)], IntegerType())
df2 = spark.createDataFrame([i for i in range(1000000)], IntegerType())
## Perform inner join on df1 and df2
df = df1.join(df2, how="inner", on="value")
## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
df_odd = df.filter(df.value % 2 == 1)
df_even = df.filter(df.value % 2 == 0)
## Add a transformation with a field called magic_value which is generated by two dummy transformations.
df_odd = df_odd.withColumn("magic_value", df.value+1)
df_even = df_even.withColumn("magic_value", df.value/2)
## Union the odd and even number data frames
df_odd.union(df_even).count()
				
			

Here is a high-level view of what the DAG looks like. If we look at the DAG bottom-up, one thing that stands out is the join happened twice, and the upstream almost looks identical. 

We have seen where Spark needs to optimize the union operator extensively, and much time is wasted performing unnecessary recomputing if the data source can be reused. 

The DAG for non-optimized query plan for Union Operation | Image By Author
The DAG for non-optimized query plan for Union Operation | Image By Author

Here is the physical plan that has 50 stages scheduled with AQE enabled. We can see ids 13 and 27. Spark did perform join twice on each branch and recomputed its branch. 

The non-optimized Physical query plan for Union Operation | Image By Author
The non-optimized Physical query plan for Union Operation | Image By Author

How to Improve the Performance of Union Operation

Now we can see this potential bottleneck. How could we resolve this? One option is to double the number of executors to run more concurrent tasks. But there is a better way to hint to Catalyst and let it reuse the joined data frame from memory. 

To resolve the issue of the Spark performance of union operation, we can explicitly call a cache to persist the joined data frame in memory. So Catalyst knows the shortcut to fetch the data instead of returning it to the source. 

Where to add the cache() ? The recommended place would be the data frame before the filtering and after the join is completed.

Let’s see it in action:

				
					# ...........................
## Perform inner join on df1 and df2
df = df1.join(df2, how="inner", on="value")
## add cache here
df.cache()
## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
df_odd = df.filter(df.value % 2 == 1)
# ...........................
				
			

Here is the query plan: InMemoryTableScan is present, so we can reuse the data frame to save other computing. 

The DAG for optimized query plan for Union Operation | Image By Author
The DAG for optimized query plan for Union Operation | Image By Author

Now the physical plan is reduced to have only 32 stages, and if we check, ids 1 and 15 both leverage the InMemoryTableScan. This could save much more time if we split the original data frames into smaller datasets and then union them. 

Optimized Physical query plan for Union Operation | Image By Author
Optimized Physical query plan for Union Operation | Image By Author

Final Thoughts

I hope this story helps provide some insights into why sometimes the union operation becomes a bottleneck for your Spark performance. Due to the lack of optimization in Catalyst for the union operator in Spark, users need to be aware of such caveats to develop Spark code more effectively. 

Adding cache can save time in our example, but it won’t help if the union is performed on two completely different data sources and there is no shared place to perform cache.

This story is inspired by Kazuaki Ishizaki’s talk —  Goodbye Hell of Unions in Spark SQL, and my experience handling a similar issue for my projects. 

About Me

I hope my stories are helpful to you. 

For data engineering post, you can also subscribe to my new articles or becomes a referred Medium member that also gets full access to stories on Medium.

In case of questions/comments, do not hesitate to write in the comments of this story or reach me directly through Linkedin or Twitter.

More Articles

Photo by Volodymyr Hryshchenko on Unsplash

Mastering Gantt Charts: Learn How to Build Them Using Code Alone

Learn how to master Gantt charts using code alone in project management tracking. Many UI-based tools for Gantt chart creation can take a lot of ...
Read More →
Photo by Mateusz Butkiewicz on Unsplash

4 Faster Pandas Alternatives for Data Analysis

Pandas is no doubt one of the most popular libraries in Python. However, Pandas doesn't shine in the land of data processing with a large ...
Read More →
Photo by Nick Brunner on Unsplash

Bidding War on Housing Market? Let’s Use R For Exploratory Data Analysis

Exploratory Data Analysis (EDA) is a methodology in data science as the initial approach to gain insights by visualizing and summarizing data. We will use ...
Read More →

1 thought on “Boosting Spark Union Operator Performance: Optimization Tips for Improved Query Speed”

  1. As of today I think the bottleneck in the union operator is partially fixed by Catalyst. Instead of redoing the join between 2 fact tables, Spark only does it once but re-read multiple times the result in memory somehow, each time to produce a smaller dataset. Then in the end, join them all together. This is not as optimal as using cache because for example, after my initial join, my DF has 200 partitions, and from that DF, I create 4 smaller DF, then the union stage will still have to process 800 partitions. My dataset is pretty big (70 GB deserialized) so cache is not really an option for me. So, the solution proposed by Spark Catalyst is not bad.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link