Hydronitrogen Tech Blog

Hamel Ajay Kothari writes about computers and stuff.



Poor Hash Partitioning of Timestamps, Integers and Longs in Spark


A word of warning if you typically partition your DataFrames/RDDs/Datasets in Spark based on a Integer, Long or Timestamp keys. If you're using Spark's default partitioning settings (or even something similar) and your values are sufficiently large and regularly spaced out, it's possible that you'll see poor partitioning performance partitioning these datasets.

This can manifest itself in DataFrame "repartition" or "groupBy" commands and in the SQL "DISTRIBUTE BY" or "CLUSTER BY" keywords, among other things. It can also manifest itself in RDD "groupByKey", "reduceByKey" commands if your number of cores evenly divides the hashCodes of your data (read on if this doesn't exactly make sense).

If you're interested in why this happens, read on. Otherwise feel free to jump to the end to see how to work around this.

Integer/Long Timestamps (in seconds)

Let's look at an example with Integers. An Integer's hashCode is defined in Java to be the Integer value itself. So, consider you have an RDD keyed by the following values:

1104537600, 1104624000, 1104710400, 1104796800, 1104883200, 1104969600, 1105056000, 1105142400, 1105228800, 1105315200, 1105401600, 1105488000, 1105574400, 1105660800, 1105747200, 1105833600, 1105920000, 1106006400, 1106092800, 1106179200, 1106265600, 1106352000, 1106438400, 1106524800, 1106611200, 1106697600, 1106784000, 1106870400, 1106956800, 1107043200, 1107129600, 1107216000, 1107302400, 1107388800, 1107475200, 1107561600, 1107648000, 1107734400, 1107820800, 1107907200, 1107993600, 1108080000, 1108166400, 1108252800, 1108339200, 1108425600, 1108512000, 1108598400, 1108684800, 1108771200, 1108857600, 1108944000, 1109030400, 1109116800, 1109203200, 1109289600, 1109376000, 1109462400, 1109548800, 1109635200, 1109721600, 1109808000, 1109894400, 1109980800, ...

These values are epoch timestamps of midnight for successive days at a second granularity. Looking at the data, something should jump out to you immediately: they're all evenly divisible by 200 (also 50, 400 and other values). That mean's if you're using the Spark SQL default shuffle partitions (200) and you're partitioning on a column of date values with second granularity, odds are you will end up with effectively 1 partition and 199 empty partitions.

RDDs are not necessarily safe from this. Spark's default parallelism is set to the max of the number of cores in your context or 2. If your hashCodes are evenly divisible by this number you could feel this pain in RDD code as wel.

What happens if these were Long values, instead of integers? Well, a Long's hashcode is defined as the hashcode of the XOR of the the two integer halves. If your value fits into an integer, like the above example, it's no different.

Long/java.sql.Timestamp Timestamps (milliseconds)

Let's make this slightly more realistic: Instead of second timestamps, you've got millisecond timestamps. Well, they would be represented as long values and would pass over the half boundary of the long value, so that's good. But in fact they're all pretty much identical on the top half and on the bottom half they're still evenly divisible.

If we were to take all of the above timestamps and turn them into milliseconds, and then take the hashcode it would look like the following:

731005185, 817405185, 903805185, 990205185, 1076605185, 1163005185, 1249405185, 1335805185, 1422205185, 1508605185, 1595005185, 1681405185, 1767805185, 1854205185, 1940605185, 2027005185, 2113405185, -2095162111, -2008762111, -1922362111, -1835962111, -1749562111, -1663162111, -1576762111, -1490362111, -1403962111, -1317562111, -1231162111, -1144762111, -1058362111, -971962111, -885562111, -799162111, -712762111, -626362111, -539962111, -453562111, -367162111, -280762111, -194362111, -107962111, -21562111, 64837890, 151237890, 237637890, 324037890, 410437890, 496837890, 583237890, 669637890, 756037890, 842437890, 928837890, 1015237890, 1101637890, 1188037890, 1274437890, 1360837890, 1447237890, 1533637890, 1620037890, 1706437890, 1792837890, 1879237890, 1965637890 ...

Notice a trend? The last 5 digits are really heavily clustered. If you were to mod these by 200 you would end up with 80 buckets. Still not great partitioning..

Unfortunately, the java.sql.Timestamp type just uses the hashCode of its Long value so Spark's Timestamp type falls victim to this.

A Simple Solution

A simple solution to this problem is to choose a number of partitions which is prime (eg. 199, 211), this will reduce collisions and more evenly spread out your partitions.

If you're using RDD[K, V], you've got to explicitly call partitionBy with a HashPartitioner configured for your desired partitions as an argument. You can also just override spark.default.parallelism to be something prime.

If you're using DataFrames/Datasets and typically use repartition(\$"myTimestampColumn") all you need to do is add a parameter for that specifying the number of partitions you would like: repartition(199, \$"myTimestampColumn"). You can also just override spark.sql.shuffle.partitions.

If you're using pure SQL (eg. through the SQL shell) and trying to use the DISTRIBUTE/CLUSTER BY keyword, your only option seems to be overriding the spark.sql.shuffle.partitions.


Powered by Pelican, Python, Markdown and tons of other helpful stuff.