The union operator is one of the set operators to merge two input data frames into one. Union is a convenient operation in Apache Spark for combining rows with the same order of columns. One frequently used case is applying different transformations and then unioning them together.
The ways of using the union operation in Spark are often discussed widely. However, a hidden fact that has been less discussed is the performance caveat associated with the union operator. If we didn’t understand the caveat of the union operator in Spark, we might fall into the trap of doubling the execution time to get the result.
We will focus on the Apache Spark DataFrame union operator in this story with examples, show you the physical query plan, and share techniques for optimization in this story.
Union Operator 101 in Spark
Like Relational Database (RDBMS) SQL, the union is a direct way to combine rows. One important thing to note when dealing with a union operator is to ensure rows follow the same structure:
- The number of columns should be identical. The union operation won’t silently work or fill with NULL when the number of columns differs on data frames.
- The column data type should match and resolves columns by position. The column name should follow the same sequence for each data frame. Nevertheless, that’s not mandatory. The first data frame will be chosen as the default for the column name. So mixing order can potentially cause an undesired result. Spark
unionByNameis intended to resolve this issue.
In Spark, the operation
unionAll is an alias to
union that doesn’t remove duplication. We’d need to add distinct after performing union to perform SQL-like union operations without duplication.
We can also combine multiple data frames to produce a final data frame.
df = df1.union(df2).union(df3)
Performance Bottleneck of Union Operator
One typical pattern of using the union operator is splitting a single data frame into multiple, then applying different transformations, and eventually combining them into the final one.
Here is an example: we have two big tables (fact table) that need to join, and the best way to join is the SortMerged join in Spark. Once we got the SortMerged data frame, we split it into four subsets. Each subset uses different transformations, and eventually, we combine those 4 data frames into the final one.
Spark data frame leverages Catalyst optimizer, which takes the data frame code you had, then performs code analysis, logical optimization, physical planning, and code generation. Catalyst tries to create an optimal plan that executes your Spark job efficiently.
In recent years, Spark has extensively accomplished a lot of optimization on Catalyst to improve performance on Spark join operations. The join operation has more scenarios to use than the union operation, leading to less effort put into the union operation.
If users don’t use union on entirely different data sources, union operators will face a potential performance bottleneck — Catalyst isn’t “smart” to identify the shared data frames to reuse.
In this case, Spark will take each data frame as separate branches, then perform everything from the root multiple times. In our example, we will perform the two big table join four times! It is a huge bottleneck.
Set up an Example with Union Operator in Spark
It’s straightforward to reproduce a non-optimized physical query plan for the union operator in Spark. We will do the following
- Create two data frames from 1 to 1000000. Let’s call them
- Perform inner join on
- Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers
- Add a transformation with a field called
magic_value, which is generated by two dummy transformations.
- Union the odd and even number data frames
## Create two data frames from 1 to 1000000. Let's call them df1 and df2 df1 = spark.createDataFrame([i for i in range(1000000)], IntegerType()) df2 = spark.createDataFrame([i for i in range(1000000)], IntegerType()) ## Perform inner join on df1 and df2 df = df1.join(df2, how="inner", on="value") ## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers df_odd = df.filter(df.value % 2 == 1) df_even = df.filter(df.value % 2 == 0) ## Add a transformation with a field called magic_value which is generated by two dummy transformations. df_odd = df_odd.withColumn("magic_value", df.value+1) df_even = df_even.withColumn("magic_value", df.value/2) ## Union the odd and even number data frames df_odd.union(df_even).count()
Here is a high-level view of what the DAG looks like. If we look at the DAG bottom-up, one thing that stands out is the join happened twice, and the upstream almost looks identical.
We have seen where Spark needs to optimize the union operator extensively, and much time is wasted performing unnecessary recomputing if the data source can be reused.
Here is the physical plan that has 50 stages scheduled with AQE enabled. We can see ids 13 and 27. Spark did perform join twice on each branch and recomputed its branch.
How to Improve the Performance of Union Operation
Now we can see this potential bottleneck. How could we resolve this? One option is to double the number of executors to run more concurrent tasks. But there is a better way to hint to Catalyst and let it reuse the joined data frame from memory.
To resolve the issue of the Spark performance of union operation, we can explicitly call a
cache to persist the joined data frame in memory. So Catalyst knows the shortcut to fetch the data instead of returning it to the source.
Where to add the
cache() ? The recommended place would be the data frame before the filtering and after the join is completed.
Let’s see it in action:
# ........................... ## Perform inner join on df1 and df2 df = df1.join(df2, how="inner", on="value") ## add cache here df.cache() ## Split the joined result into two data frames: one only contains the odd numbers, another one for the even numbers df_odd = df.filter(df.value % 2 == 1) # ...........................
Here is the query plan: InMemoryTableScan is present, so we can reuse the data frame to save other computing.
Now the physical plan is reduced to have only 32 stages, and if we check, ids 1 and 15 both leverage the InMemoryTableScan. This could save much more time if we split the original data frames into smaller datasets and then union them.
I hope this story helps provide some insights into why sometimes the union operation becomes a bottleneck for your Spark performance. Due to the lack of optimization in Catalyst for the union operator in Spark, users need to be aware of such caveats to develop Spark code more effectively.
Adding cache can save time in our example, but it won’t help if the union is performed on two completely different data sources and there is no shared place to perform cache.
I hope my stories are helpful to you.