Deep Dive into Handling Apache Spark Data Skew

Photo by Lizzi Sassman on Unsplash
Photo by Lizzi Sassman on Unsplash

Why my Spark job is running slow? is an inevitable question while working with Apache Spark. One of the most common scenarios regarding Apache Spark performance tuning is data skewIn this article, we will cover how to identify whether your Spark job slowness is caused by data skew and deep dive into handling Apache Spark data skew with code to explain three ways to handle data skew, including the “salting” technique.

How to Identify Data Skew In Spark

There are many factors when it comes to Spark performance tuning. Given the complexity of distributed computing, you are halfway to success if you can narrow it down to the bottleneck. 

Data skew usually happens when partitions have uneven data to process. Let’s assume we have three partitions in Spark to process data for 1.5M records. Ideally, each partition gets 0.5M to process evenly (Picture 1 left). However, we could have a time when a single partition takes much more data than the other partitions (Picture 1, right). 

Picture 1 | image By Author
Picture 1 | image By Author

Why does one partition take more data than the others? This is related to how the distribution system works. In many data processing frameworks, data skew is caused by data shuffling, which is moving data from one partition to another. Data shuffle needs to be taken care of when it comes to performance tuning, as it involves transferring data around the nodes in your cluster. It can cause an undesired delay in your data pipeline and is difficult to discover.

A data shuffle is expensive, but sometimes it is inevitable to perform a wide operation such as groupBy and joins. Those operations usually are key-based, meaning keys are hashed and then mapped to partitions. The same hash value is guaranteed to shuffle to the same partition. In the above example, many keys are hashed to partition A with massive volume, and partition A becomes a “hot spot” to process close to 99% of the data. That’s why this entire job is running behind — The data isn’t not distributed well, as partition B and C stays idle most of the time, and partition A is the guinea pig that handles the heavy load.

What are the signs of data skew in Spark? We cannot blame every slowness that comes from data skew. The Spark Web UI is the best native solution to identify the skewness in your Spark job. When you are at the Stages tab in Spark UI, the skewed partitions hang within a stage and don’t seem to progress for a while on a few partitions. If we look at the summary metrics, the max column usually has a much larger value than the medium and more records count. Then we know we have encountered a data skew issue.

How to know which part of the code causes data skew? The stage detail page in Spark UI only gives us a visual representation of the DAG. 

How do you know which part of the code in Spark is running slow? It is mentioned in the Spark official documentation

Whole Stage Code Generation operations are also annotated with the code generation id. For stages of Spark DataFrame or SQL execution, this allows for cross-referencing Stage execution details to the relevant details in the Web-UI SQL Tab page where SQL plan graphs and execution plans are reported.

In the following case, we can use the WholeStageCodegen ids: 2,4, or 5. We can go to the Spark Data Frame tab to find the code and hover on the SQL plan graphs to know the detail of what’s running in your code.

Example of Codegen Id | Image By Author
Example of Codegen Id | Image By Author

Spark Data Skew Example Set up

Let’s start by setting up a spark environment with data skew to demonstrate the issue. We will set up only 1G forspark.executor.memory and one executor with three cores, and spark.sql.shuffle.partitionsto three as well, so we will get three partitions finally. We can use spark_partition_id to determine which partition a record belongs to verify the data distribution. To ensure spark is less smart to figure out more optimization like increasing the number of partitions or converting the physical plan to broadcast join, we will turn off Adaptive Query Execution (AQE) by settingspark.sql.adaptive.enabled to false.

We don’t need to import additional data sources to set up the example. We can create random data and play with it as our example throughout this article. 

Case 1: evenly distributed case

We will create a dataframe with 1,000,000 rows in Spark. In this case, those values from 0 to 999,999 are the key that is hashed and shuffled. Notice the key here is unique, which means there aren’t any duplications. Those ensure the keys are non-deterministically. There is no guarantee that two different keys always being in the same partition.

					df_evenly = spark.createDataFrame([i for i in range(1000000)], IntegerType())
df_evenly = df_evenly.withColumn("partitionId", spark_partition_id())

You can verify the number of partitions by using getNumPartitions, and in this case, it should be three since we only have one executor and three cores.

//output 3

If everything is distributed evenly, we will get a well-distributed count if we group by the partitionId. This is the perfect case we have mentioned above Picture 1 left. 

Data Partitioned Evenly By PartitionId | Image By Author
Data Partitioned Evenly By PartitionId | Image By Author

We can then perform a self-join to check what the plan looks like, and we’d expected SortMergJoin, which is usually the best we can do if two datasets are equally significant.

					df_evenly.alias("left").join(df_evenly.alias("right"),"value", "inner").count()

In the following result, we can see data size total is distributed well among three partitions, and if we look at the time it takes for each partition, they don’t seem to have substantial gaps.

Data Partitioned Evenly As Spark Physical Plan| Image By Author
Data Partitioned Evenly As Spark Physical Plan| Image By Author

Case 2: Skew case

Now, let’s go extreme to have what Picture 1 right showed, where we have an extremely skewed dataset.

We will still create a dataframe with 1000000 rows in Spark. However, instead of having all keys with different values, we will make most of them the same. This ensures that we create a “hot” key that becomes problematic no matter how many hash functions we try. It guarantees to be in the same partition. 

					df0 = spark.createDataFrame([0] * 999998, IntegerType()).repartition(1)
df1 = spark.createDataFrame([1], IntegerType()).repartition(1)
df2 = spark.createDataFrame([2], IntegerType()).repartition(1)
df_skew = df0.union(df1).union(df2)
df_skew = df_skew.withColumn("partitionId", spark_partition_id())
## If we apply the same function call again, we get what we want to see for the one partition with much more data than the other two.
Data Skew By PartitionId | Image By Author
Data Skew By PartitionId | Image By Author

In this case, 99.99% of the data is in a single partition. Let’s perform a join with our distributed evenly dataset to check what the plan looks like. Before we run the join, let’s repartition our skew dataset into three partitions in a round-robin way to simulate how we will read data in actual use cases.

					//simulate reading to first round robin distribute the key
df_skew = df_skew.repartition(3)
df_skew.join("value"),"value", "inner").count()

Checking on the Spark physical plan, we can see how unevenly distributed it is with a large amount of data in one partition (max time), and the time to join is exponential. 

Data Partitioned Skew As Spark Physical Plan| Image By Author
Data Partitioned Skew As Spark Physical Plan| Image By Author

How to resolve the Spark data skew problem

Data Skew causes slow performance in Spark, and a job is stuck in a few partitions that hang forever. There are multiple strategies to resolve a skew. Today, with Adaptive Query Execution (AQE) on Spark, it’s easier for Spark to be clever to figure out the optimized way. In edge cases, AQE isn’t 100% giving the best optimization. At those times, we still need to intervene and be familiar with which to use.

1. Leveraging the Number of Partitions

spark.sql.shuffle.partitions might be one of the most critical configurations in Spark. It configures the number of partitions to use when shuffling data for joins or aggregations. Configuring this value won’t always mean dealing with the skew issue, but it could be general optimization on the Spark job. The default value is 200, which is suitable for many big data projects back in the day and still relevant for small/medium size data projects. 

Think of this as the number of bins when data has to be tossed around during the shuffling stage. Are there too much data for a single bin to handle, or are they almost full? 

2. Broadcast join

Broadcast join might be the fastest join type that you can use to avoid skewness. By giving when the BROADCAST hint, we explicitly provide information to Spark on which dataframe we’d need to send to each executor. 

The broadcast join usually works with smaller size dataframe like dimension tables or the data has metadata. It is not appropriate for transaction tables with millions of rows. 

					df_skew.join(broadcast("value")),"value", "inner").count()

3. Salting

The SALT idea from cryptography introduced randomness to the key without knowing any context about the dataset. The idea is for a given hot key; if it combines with different random numbers, we’ll not have all the data for the given key processed in a single partition. A significant benefit of SALT is it is unrelated to any of the keys, and you don’t have to worry about some keys with similar contexts with the same value again.

I have published another article on Skewed Data in Spark? Add SALT to Compensate. You can read more to learn about it.

However, in the above article, I have only provided code for salting on aggregation. Still, I haven’t mentioned how to perform slating on join, which leaves some questions: “I understood we could salt on the key to distributing data evenly, but that changed my join key. How do you join back to the original key after salting?” I will provide some code examples in this post.

The core idea of leveraging key salting is to think space-time tradeoff. 

  • Add the salt key as part of the key as a new column. We also call the original key and the salt key a composite key. The newly added key forces Spark to hash the new key to a different hash value, so it shuffles to a different partition. Note we can also be dynamic to get the number of randomness on the salt key by retrieving the value from spark.sql.shuffle.partitions
					df_left = df_skew.withColumn("salt", (rand() * spark.conf.get("spark.sql.shuffle.partitions")).cast("int"))

As you can see below, though value and partitionId are the same, we create an additional “salt” column to provide more guidance for Spark to join.

Add the salt key as part of the key as a new column | Image By Author
Add the salt key as part of the key as a new column | Image By Author
  • Add an array of all the potential salt keys as a new column. You can choose a dataframe with a smaller number of rows (if they are the same, pick a random one), and
					df_right = df_evenly.withColumn("salt_temp", array([lit(i) for i in range(int(spark.conf.get("spark.sql.shuffle.partitions")))]))
Add an array of all the potential salt keys as a new column | Image By Author
Add an array of all the potential salt keys as a new column | Image By Author
  • Explore the dataframe with that array. This will replicate the existing rows n times (n=number of salt you chose). When the two dataframes joined, since we already have the replicated dataframe on one side (usually the right side), it is still validated joining. It produces the same result as we are using the original key. 

We can also validate the final distribution after joining. The joined dataframe for the same key “0” is distributed evenly across three partitions. This even distribution shows the technique of key salting.

Final PartitionId after Join | Image By Author
Final PartitionId after Join | Image By Author

Looking at the physical plan, the data is distributed evenly and takes similar times to process on percentile metrics. We can tell the significant difference if we choose a much larger dataset. 

Key Salting to improve Spark Performance Physical Plan | Image By Author
Key Salting to improve Spark Performance Physical Plan | Image By Author

Final Thought

Data Skew in Apache Spark can be handled in various ways. It can be resolved from Spark configuration, from Spark plan optimization, or from hacking a “salt” key to guide Spark to distribute data evenly. Identifying the reason for a Spark job slowness is the foundation for any Spark tunning. Among those reasons, Data skew is one of those frequent blamers. 

I wrote this article to help everyone better understand what data skew is in Spark and potential solutions to solve them. However, when it comes to Spark performance optimization, there isn’t a silver bullet. You’d need to spend more energy looking at the query plan and figuring out what’s happening in the code. More knowledge is gained by trial and error.

Here are some books that could help you gain more insights about Spark:

About Me

I hope my stories are helpful to you. 

For data engineering post, you can also subscribe to my new articles or becomes a referred Medium member that also gets full access to stories on Medium.

In case of questions/comments, do not hesitate to write in the comments of this story or reach me directly through Linkedin or Twitter.

More Articles

Photo by Vardan Papikyan on Unsplash

The Foundation of Data Validation

If you are reading this blog post, you may have faced the challenge of data validation before, or you might be struggling with it. My goal in this post is to share my experience with data validation

Read More »
Source: Aron Visuals from Unsplash

Airflow Schedule Interval 101

The airflow schedule interval could be a challenging concept to comprehend, even for developers work on Airflow for a while find difficult to grasp. A confusing question arises every once a while on StackOverflow is “Why my DAG is not running as expected?”. This problem usually indicates a misunderstanding among the Airflow schedule interval.

Read More »

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link