Point Wise Guide to three P’s of Apache Spark: Partitions, Partitioning and the Partitioner
Partitions, Partitioning and the Partitioner forms the three important pillars/concepts of Spark. A thorough understanding of these enable developers to write reliable & performance efficient Spark Job operating on large data sets. Here is a point wise guide to understand each of these:
Partitions:
(1) A partition, simply said, represents a logical division of an input/output/intermediate data set.
(2) In Spark, each data set is understood to be composed of one or more partitions, and this is well incorporated into the most basic data structure of Spark, viz. RDD.
(3) These partitions could be located either in one node or spread across multiple cluster nodes.
(4) It is perfectly allowed to have empty partition(s) without any real data.
(5) Number of partitions corresponding to data set are no way related to number of CPU cores available in the cluster. One can have a higher or a lower number.
(6) To compute each partition of particular dataset, Spark schedules a single computing task to be executed on a single CPU core. Hence, partitions form the basic units on which Spark parallel computing framework is built.
(7) A partition belonging to a data set is computed either by applying narrow data transformation on a single partition of an already computed data set, or by applying wide data transformation on multiple partitions of an already computed data set.
(8) Data Resiliency in Spark is also built around partitions. If certain task(s) (responsible for computing partition(s) for a data set) fails during a stage, Spark ensures that only relevant dependent partitions gets recomputed in the earlier stages.
(9) Spark provides multiple APIs to user to perform transformation (mapPartitions, glom) and actions (foreachPartition) on a partition as a whole. These transformations and actions could bring efficiency into the Job at times based on the use cases.
Partitioning:
(1) Partitioning can be understood as an act/process of dividing the data set into multiple partitions. Both, partitioning logic and the resultant number of partitions are the two important aspects of partitioning process.
(2) Partitioning can make or break a Spark stage. There are risks of stage failure/inefficiency if partitioning results in too few partitions, or too many partitions, or uneven partitions (in terms of the number of data records residing in the partition).
(3) Too few partitions for a large data set constrains the executors operating on those partitions causing the risk of extreme slowness or memory overruns which leads to stage failures. Also, fewer partitions relative to large data set could not exploit the concurrent execution opportunities available in the cluster.
(4) Too many partitions for a data set increases the amount of overhead/metadata related to partitioning. Also, scheduling overhead could potentially exceed the real execution time of corresponding partitions tasks. Both these facts could potentially make a stage extremely inefficient or lead to failure in extreme cases.
(5) Uneven (or skewed) partitions are too dangerous since some of the executors are subjected to heavy computing load in comparison to others. These heavily loaded executors could lead to wastage of cluster resources since the user has to provision higher executor memory in consideration of heavily loaded executors. Wastage also happens because lightly loaded executors wait for the heavier ones to finish in order to get themselves pooled back to the cluster. Also, heavily loaded executors induce stage slowness and could involve risk of memory overruns (if enough memory is not provisioned) causing repetitive failure of a Spark stage. These heavily loaded executors tasks are commonly called as stragglers.
(6) Spark has provisions for both implicitly and explicit partitioning. Spark employs implicit partitioning while reading an input dataset or calculating an intermediate or output dataset. The implicit partitioning logic could be the same or different across various use cases. Certain implicit partitioning schemes also exposes partitioning related tunable parameters (with a default value) in Spark Job configuration. “spark.sql.shuffle.partitions” and “spark.default.parallelism” are two examples of such parameters.
(7) Explicit partitioning is exposed in multiple ways in Spark. A user can give an explicit number of resultant partitions in certain Spark APIs mostly related to the reading/writing of distributed data. Spark also provides dedicated APIs to explicitly change the number of partitions of a data set. These are popularly known as re-partitioning APIs.
(8) One of the re-partitioning APIs, called as repartition, could be used generically to increase/decrease the number of partitions of a dataset. Repartition API is not shuffle optimized. Each element of the dataset is shuffled to a new partition. User can use various flavors of this API. Each of these API takes one or more of the following inputs parameters from the user:
- Number of resultant partitions: If a repartition API does not expose this, internally Spark assumes a default number of resultant partitions.
- Partitioning expression/key (on the basis of which partitioning is performed): If a repartition API does not expose this, internally Spark assumes a default expression/key.
- Partitioning scheme: In hash partitioning scheme, partitioning is executed based on hash of the value obtained after evaluating the partitioning expression/key (calculated via a suitable hashing function), whereas in range partitioning, an overall range is first estimated on values resulting from the evaluation of the partition expression/key. This is then followed by the assignment of exclusive sub-ranges (from the overall range) to each of the partitions.
(9) Another re-partitioning APIs, called coalesce, is used mostly to decrease the number of partitions of a data set. It is shuffle optimized since firstly decreased number of partitions are identified and then the data in extra partitions are only shuffled. Shuffle Optimized coalesce could possibly result in unequal partitions however, on the other hand, repartition API mostly ensures equal partitions until and unless the partitioning key values distribution in the data set is highly skewed.
Partitioner:
(1) Partitioner conceals the partitioning scheme of the most basic Spark data structure (viz. Spark RDD) and is a part of the same.
(2) The partitioning scheme essentially captures two aspects, one number of partitions and another a function that outputs a partition number against a partitioning key.
(3) There are various types of partitioners that are already available in Spark, such as HashPartitioner, RangePartitioner, etc.
(4) There is also a provision in Spark to allow users to write their own Custom Partitioner and use it to partition RDDs during RDD transformations.
(5) Spark use HashPartitioner or RangePartitioner by default to partition RDDs as and when required during RDD transformations.
(6) Certain transformations keep the partitioner intact in the output RDD, such as repartition, partitionBy, cogroup, reduceByKey, mapValues, etc.
(7) However, there are other transformations where partitioner is forgotten in the output RDD. Examples are map, mapToPair, etc.
(8) Spark is able to optimize shuffle operations while performing transformations involving multiple RDDs. This optimization is based on the partitioner objects associated with the corresponding inputs RDDs. This underlying optimization could be of great use while designing a high performance Spark Job.
In case you have more queries regarding Spark partitioning, please write to me in comments.
Get a copy of my recently published book on Spark Partitioning: https://www.amazon.com/dp/B08KJCT3XN/