In Spark, coalesce and repartition are well-known functions that explicitly adjust the number of partitions as people desire. People often update the configuration:
spark.sql.shuffle.partition to change the number of partitions (default: 200) as a crucial part of the Spark performance tuning strategy.
spark.sql.shuffle.partition to reduce the compute time is a piece of art in Spark, it could lead to some headaches if the number of partitions is large. Since this configuration controls the number of final files in HDFS, it may not be the same as you want. Sometimes, you may want to have just one file. In this article, we will discuss a neglected part of Apache Spark between coalesce(1) and repartition(1), and it could be one of the things to be attentive to when you check the Spark job performance.
A common way to reduce the number of files is to decrease the number of partitions, and we can call coalesce or repartition explicitly in code to achieve this goal. If you have a Spark DataFrame and want to change the number of output files to a single one, you’d mostly write down the following:
df.coalesce(1).write.format("csv").save(filepath) // OR df.repartition(1).write.format("csv").save(filepath)
Both should give you the correct result. So what’s the difference between coalesce(1) and repartition(1)? Let’s take a look of them in more detail:
The definitions of coalesce and repartition
Both functions are methods in Dataset class. From official Spark documentation:
coalesce: Returns a new Dataset that has exactly
numPartitionspartitions, when the fewer partitions are requested.
repartition: Returns a new Dataset that has exactly
Similar to the function
cache(), which called
persist(StorageLevel.MEMORY_ONLY) . If we look at the Spark source code, repartition is a friendly name by calling coalesce directly but lock shuffle to true.
coalesce(numPartitions, shuffle = true)
As you can see, repartition will trigger the shuffle, which is an expensive operation. On the other hand, coalesce without the shuffle flag won’t perform shuffle. Thus it results in narrow dependency.
coalesce(1) and repartition(1) performance evaluation
Use repartition if you want to increase the number of partitions, and use coalesce if you want to reduce the number of partitions,
coalesceperforms better than
repartitionwhile reducing the number of partitions. — — Online Recommendation
The recommendation above is good in general. However, it is not a silver bullet. In this case, if you think
coalesce(1) can be applied to all the scenarios when you want to keep a single file as a result, please continue reading, and we can show the result. Thanks to Google, we are going to use Google’s Colab for our investigation.
- Set “spark.sql.shuffle.partitins” in Spark session to 2001 explicitly to be able to notice the difference. If you want to use Spark UI, you’d need to use colab’s local compute mode.
- Create a simple Spark DataFrame with a dataset online, here we craigslist-car-data. The content or schema of the dataset won’t matter in our performance test.
- The initial partition would be the same as our cores that colab gave us, or if you are running on the local mode, it would be the same as the cores. After a join operation is performed, you’d noticed that the partitions are the same as we set initially. For the demonstration, we use the self-join here, but you can assume different DataFrames to be joined together.
- Now we can try to write output using coalesce(1) and check the Spark UI
- Then we can try to write output using repartition(1)
As you noticed, the coalesce(1) takes about 17 seconds and has only 25 tasks in total, whereas repartition(1) takes 13 seconds and has 2026 (2001+25) tasks in total.
If we looked at the DAG, coalesce(1) has three stages, but repartition(1) has four stages.
In this case, since repartition(1) envoke shuffle, you’d see the fourth stage on the Spark UI. If you click on the coalesce(1) join stage and check the number of tasks compared with repartition(1), you’d quickly find where the bottleneck is. We are using only one partition to perform the join operation. On the other hand, repartition(1) is still using 2001. If you have a considerable dataset, you need to perform join or shuffle one partition won’t take the Spark advantage to enable the execution in parallel.
The neglected fact is for coalesce(1) to be performed and avoid shuffling, and it shrinks its parent RDD to explicitly use the same number of partitions. Contrarily, repartition(1) preserves the number of partitions of its parent. You’d see this behavior on the Spark UI if coalesce(1) is called right after an expensive shuffle. We have noticed the difference in a dataset of only about 1.5GB. For a large dataset, a more significant difference can be shown.
The performance fact is documented on the Spark API page as well. You’d need to expand the coalesce function to see the details.
However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is). — — Spark Documentation
In the example we have demonstrated earlier, it was a drastic coalesce, and a bad performance was perceived due to no parallelism to conduct the join operation. To conclude, there is no silver bullet for coalesce and repartition in Spark. It depends on your data volume and business logic to check whether you want to use coalesce and repartition eventually. In some cases, you’d need to handle online recommendations differently in Spark; it is a good idea to try it yourself before jumping to a conclusion.
Below is the code for the detail of the example shown above: Colab
I hope my stories are helpful to you.