“Why my Spark job is running slow?” is an inevitable question while working with Apache Spark. One of the most common scenarios regarding Apache Spark performance tuning is data skew. In this article, we will cover how to identify whether your Spark job slowness is caused by data skew and deep dive into handling Apache Spark data skew with code to explain three ways to handle data skew, including the “salting” technique.
How to Identify Data Skew In Spark
There are many factors when it comes to Spark performance tuning. Given the complexity of distributed computing, you are halfway to success if you can narrow it down to the bottleneck.
Data skew usually happens when partitions have uneven data to process. Let’s assume we have three partitions in Spark to process data for 1.5M records. Ideally, each partition gets 0.5M to process evenly (Picture 1 left). However, we could have a time when a single partition takes much more data than the other partitions (Picture 1, right).
Why does one partition take more data than the others? This is related to how the distribution system works. In many data processing frameworks, data skew is caused by data shuffling, which is moving data from one partition to another. Data shuffle needs to be taken care of when it comes to performance tuning, as it involves transferring data around the nodes in your cluster. It can cause an undesired delay in your data pipeline and is difficult to discover.
A data shuffle is expensive, but sometimes it is inevitable to perform a wide operation such as groupBy and joins. Those operations usually are key-based, meaning keys are hashed and then mapped to partitions. The same hash value is guaranteed to shuffle to the same partition. In the above example, many keys are hashed to partition A with massive volume, and partition A becomes a “hot spot” to process close to 99% of the data. That’s why this entire job is running behind — The data isn’t not distributed well, as partition B and C stays idle most of the time, and partition A is the guinea pig that handles the heavy load.
What are the signs of data skew in Spark? We cannot blame every slowness that comes from data skew. The Spark Web UI is the best native solution to identify the skewness in your Spark job. When you are at the Stages tab in Spark UI, the skewed partitions hang within a stage and don’t seem to progress for a while on a few partitions. If we look at the summary metrics, the max column usually has a much larger value than the medium and more records count. Then we know we have encountered a data skew issue.
How to know which part of the code causes data skew? The stage detail page in Spark UI only gives us a visual representation of the DAG.
How do you know which part of the code in Spark is running slow? It is mentioned in the Spark official documentation
In the following case, we can use the WholeStageCodegen ids: 2,4, or 5. We can go to the Spark Data Frame tab to find the code and hover on the SQL plan graphs to know the detail of what’s running in your code.
Spark Data Skew Example Set up
Let’s start by setting up a spark environment with data skew to demonstrate the issue. We will set up only 1G for
spark.executor.memory and one executor with three cores, and
spark.sql.shuffle.partitionsto three as well, so we will get three partitions finally. We can use spark_partition_id to determine which partition a record belongs to verify the data distribution. To ensure spark is less smart to figure out more optimization like increasing the number of partitions or converting the physical plan to broadcast join, we will turn off Adaptive Query Execution (AQE) by setting
spark.sql.adaptive.enabled to false.
We don’t need to import additional data sources to set up the example. We can create random data and play with it as our example throughout this article.
Case 1: evenly distributed case
We will create a dataframe with 1,000,000 rows in Spark. In this case, those values from 0 to 999,999 are the key that is hashed and shuffled. Notice the key here is unique, which means there aren’t any duplications. Those ensure the keys are non-deterministically. There is no guarantee that two different keys always being in the same partition.
df_evenly = spark.createDataFrame([i for i in range(1000000)], IntegerType()) df_evenly = df_evenly.withColumn("partitionId", spark_partition_id())
You can verify the number of partitions by using getNumPartitions, and in this case, it should be three since we only have one executor and three cores.
df_evenly.rdd.getNumPartitions() //output 3
If everything is distributed evenly, we will get a well-distributed count if we group by the partitionId. This is the perfect case we have mentioned above Picture 1 left.
We can then perform a self-join to check what the plan looks like, and we’d expected SortMergJoin, which is usually the best we can do if two datasets are equally significant.
In the following result, we can see data size total is distributed well among three partitions, and if we look at the time it takes for each partition, they don’t seem to have substantial gaps.
Case 2: Skew case
Now, let’s go extreme to have what Picture 1 right showed, where we have an extremely skewed dataset.
We will still create a dataframe with 1000000 rows in Spark. However, instead of having all keys with different values, we will make most of them the same. This ensures that we create a “hot” key that becomes problematic no matter how many hash functions we try. It guarantees to be in the same partition.
df0 = spark.createDataFrame( * 999998, IntegerType()).repartition(1) df1 = spark.createDataFrame(, IntegerType()).repartition(1) df2 = spark.createDataFrame(, IntegerType()).repartition(1) df_skew = df0.union(df1).union(df2) df_skew = df_skew.withColumn("partitionId", spark_partition_id()) ## If we apply the same function call again, we get what we want to see for the one partition with much more data than the other two. df_skew.groupby([df_skew.partitionId]).count().sort(df_skew.partitionId).show()
In this case, 99.99% of the data is in a single partition. Let’s perform a join with our distributed evenly dataset to check what the plan looks like. Before we run the join, let’s repartition our skew dataset into three partitions in a round-robin way to simulate how we will read data in actual use cases.
//simulate reading to first round robin distribute the key df_skew = df_skew.repartition(3) df_skew.join(df_evenly.select("value"),"value", "inner").count()
Checking on the Spark physical plan, we can see how unevenly distributed it is with a large amount of data in one partition (max time), and the time to join is exponential.
How to resolve the Spark data skew problem
Data Skew causes slow performance in Spark, and a job is stuck in a few partitions that hang forever. There are multiple strategies to resolve a skew. Today, with Adaptive Query Execution (AQE) on Spark, it’s easier for Spark to be clever to figure out the optimized way. In edge cases, AQE isn’t 100% giving the best optimization. At those times, we still need to intervene and be familiar with which to use.
1. Leveraging the Number of Partitions
spark.sql.shuffle.partitions might be one of the most critical configurations in Spark. It configures the number of partitions to use when shuffling data for joins or aggregations. Configuring this value won’t always mean dealing with the skew issue, but it could be general optimization on the Spark job. The default value is 200, which is suitable for many big data projects back in the day and still relevant for small/medium size data projects.
Think of this as the number of bins when data has to be tossed around during the shuffling stage. Are there too much data for a single bin to handle, or are they almost full?
2. Broadcast join
Broadcast join might be the fastest join type that you can use to avoid skewness. By giving when the
BROADCAST hint, we explicitly provide information to Spark on which dataframe we’d need to send to each executor.
The broadcast join usually works with smaller size dataframe like dimension tables or the data has metadata. It is not appropriate for transaction tables with millions of rows.
The SALT idea from cryptography introduced randomness to the key without knowing any context about the dataset. The idea is for a given hot key; if it combines with different random numbers, we’ll not have all the data for the given key processed in a single partition. A significant benefit of SALT is it is unrelated to any of the keys, and you don’t have to worry about some keys with similar contexts with the same value again.
I have published another article on Skewed Data in Spark? Add SALT to Compensate. You can read more to learn about it.
However, in the above article, I have only provided code for salting on aggregation. Still, I haven’t mentioned how to perform slating on join, which leaves some questions: “I understood we could salt on the key to distributing data evenly, but that changed my join key. How do you join back to the original key after salting?” I will provide some code examples in this post.
The core idea of leveraging key salting is to think space-time tradeoff.
- Add the salt key as part of the key as a new column. We also call the original key and the salt key a composite key. The newly added key forces Spark to hash the new key to a different hash value, so it shuffles to a different partition. Note we can also be dynamic to get the number of randomness on the salt key by retrieving the value from
df_left = df_skew.withColumn("salt", (rand() * spark.conf.get("spark.sql.shuffle.partitions")).cast("int"))
As you can see below, though value and partitionId are the same, we create an additional “salt” column to provide more guidance for Spark to join.
- Add an array of all the potential salt keys as a new column. You can choose a dataframe with a smaller number of rows (if they are the same, pick a random one), and
df_right = df_evenly.withColumn("salt_temp", array([lit(i) for i in range(int(spark.conf.get("spark.sql.shuffle.partitions")))]))
- Explore the dataframe with that array. This will replicate the existing rows n times (n=number of salt you chose). When the two dataframes joined, since we already have the replicated dataframe on one side (usually the right side), it is still validated joining. It produces the same result as we are using the original key.
We can also validate the final distribution after joining. The joined dataframe for the same key “0” is distributed evenly across three partitions. This even distribution shows the technique of key salting.
Looking at the physical plan, the data is distributed evenly and takes similar times to process on percentile metrics. We can tell the significant difference if we choose a much larger dataset.
Data Skew in Apache Spark can be handled in various ways. It can be resolved from Spark configuration, from Spark plan optimization, or from hacking a “salt” key to guide Spark to distribute data evenly. Identifying the reason for a Spark job slowness is the foundation for any Spark tunning. Among those reasons, Data skew is one of those frequent blamers.
I wrote this article to help everyone better understand what data skew is in Spark and potential solutions to solve them. However, when it comes to Spark performance optimization, there isn’t a silver bullet. You’d need to spend more energy looking at the query plan and figuring out what’s happening in the code. More knowledge is gained by trial and error.
Here are some books that could help you gain more insights about Spark:
I hope my stories are helpful to you.
The airflow schedule interval could be a challenging concept to comprehend, even for developers work on Airflow for a while find difficult to grasp. A confusing question arises every once a while on StackOverflow is “Why my DAG is not running as expected?”. This problem usually indicates a misunderstanding among the Airflow schedule interval.
Exploratory Data Analysis (EDA) is a methodology in data science as the initial approach to gain insights by visualizing and summarizing data. We will use some exploratory data analysis technics to find the reason behind the bidding war on the housing market.