With the release of Spark 2.2.0, they've touted the initial release of the cost based optimizer. This article explains what is included and how it's likely to affect you.
The release notes contain the following highlights for the cost based optimizer:
- SPARK-17075 SPARK-17076 SPARK-19020 SPARK-17077 SPARK-19350: Cardinality estimation for filter, join, aggregate, project and limit/sample operators
- SPARK-17080: Cost-based join re-ordering
- SPARK-17626: TPC-DS performance improvements using star-schema heuristics
We'll go into what each of these mean and how they depend on eachother but an important thing to note is that all of
these changes require
spark.sql.cbo.enabled=true to be able to take effect.
Also keep in mind that these are specific improvements to the query optimizer, this means that you're not likely to see changes to the execution time of your plans if you have already hand optimized them and in fact, turning on CBO may change the behavior in ways you don't expect so it's worth carefully testing in these cases.
But, for code where users are writing arbitrary queries, or in which you data is not known beforehand, the optimizer may now be able to generate more efficient execution plans.
If you're looking for a quick primer on how the query optimization in general works, check out my guide on the Spark SQL Query Planner and Optimizer.
The cost based optimizations appear to depend primarily on the ColumnStat class (not to be confused with ColumnStats which is for execution of in-memory columnar stuff, not planning) which contains details about the data contained within a single column across the entire dataset. This is generally just min, max, cardinality, null count, total row count but it can optionally be overriden for other column types to contain different information.
It's important to note that even with the CBO changes, this stuff is pretty much only generated if you use ANALYZE COLUMN and manually generate stats or pull it in from a Hive metastore.
Until now, operations would all only estimate sizeInBytes, so if you performed a projection, the new column would not have cardinality, or min/max estimates, just size in bytes.
Also, before the CBO changes, the ColumnStat objects weren't really used anywhere with the exception of what appears to be one small optimization on pushing LIMITs into a join so these changes represent an attempt to improve the generation of the ColumnStat objects as well as actually making use of their rich data.
This is probably the bulkiest and widest reaching change of the cost-based optimizations in Spark 2.2.0. It adds a set of estimators which, when the CBO flag is enabled, will try to create more rich ColumnStat objects for each of the columns in your tables schema. All of the estimates appear to be relatively conservative in manner, so they tend to either give estimates that are guaranteed (eg. this table will definitely have no rows because the filter is beyond the max) or will not change the execution (eg. this table will have 100% of the rows of the previous table because I can't tell if anything will be filtered).
This differs from what the existing setup does with ColumnStat objects because now it's generating more than sizeInBytes and number of rows but also cardinality and, in some cases, min/max.
A rough summary of the supported operations and behaviors are:
- Filter: Conservative guarantees of table contents relative to the child table's statistics.
- Join: Use cardinality of join columns to determine conservatively how many rows will be output from the join.
- Aggregate: Use child table's column cardinality to estimate the output size/cardinality of the aggregate table for keys and aggregate columns.
- Project: Generates a new ColumnStats object based on the columns it was derived from and adds it to the set. Pretty naive.
- Limit/Sample: This actually falls back to the existing code which can intelligently estimate size in bytes and number of rows.
Currently only the above operations are supported, but if you look at BasicStatsPlanVisitor (which is only in master currently, not 2.2.0) you can see that the infrastructure is there to support other operations like generators, distincts, sampling, when someone comes around to support them.
If CBO is not enabled, the behavior is to only estimate size in bytes, rather than things like cardinality, min, max from derived tables, which means that the optimizer won't work as well in situations
Remember: these estimations require the columns/tables they're derived from to have statistics, so if they do not, they will fall back to the naive estimations of just size, which is likley to be the case for most users unless you're using a hive catalog and/or analyzing your tables manually.
Cost-based join re-ordering
This change does pretty much what you would expect it to do from the name, reordering consecutive joins in a way that allows the smallest amount of data to be shuffled around. It defines the cost of a join as a weighted sum of the ratios of rows and size in bytes of the two tables where the relative weight of the two components (rows/size) is configuable.
Then, using this cost, when you have a series of joins with different non-intersecting criteria such that they can be reordered trivially, it implements the dynamic programming algorithm defined in Access Path Selection in a Relational Database Management System in order to figure out what the lowest net cost order of joins is to to provide the same result and returns the plan with those joins reordered to account for that.
Star Schema Heuristics
The star-schema is a data management technique in which one master table ("fact table"), is supplemented with a series of "dimension tables" which contain supplementary information about certain columns. For example, this can be an easy way to encode heirarchy information by just storing a leaf node in the fact table and the mapping from leaf, to parent, and parent's parent, etc in a separate "dimension table". This keeps your "fact table" smaller by avoiding duplicitive information. But, if you are searching for a specific dimension information, you would join the dimension table in and then perform a filter on the desired dimension column.
Spark 2.2.0 introduces some simple code for first detecting when your join is using a "star schema" style data layout then potentially reordering joins to be more optimal in this case.
I won't duplicate their description on how they detect a star schema since they've actually written pretty good docs which can be found here.
The reordering puts the largest fact table on the driving side of the join in order to encourage a hash join to take place and then applies the dimension joins in order of selectivity (similar to the above optimization) in order to reduce the amount of shuffling.
This actually gets called within the "Cost-based join re-ordering optimization" and within the non-cost-based "ReorderJoin" optimization when CBO is disabled so it's actually useful even when CBO isn't enabled, but in theory it should be more useful with the statistics generated by the estimators mentioned above.
Conclusion - What to expect
From what I looked at, it looks like the optimizations/estimations do provide some room for improvement of join speed in unoptimized queries, but this is still contingent on the tables being analyzed beforehand at least upon loading which is less than ideal. Because of this, unless you fall into that specific class of user who uses a hive catalog and hasn't already hand tuned your queries, you're likely to not see any notable improvements attributable to the cost based optimizer.
That being said, if we can push down statistic generation to the underlying sources, which seems trivially possible with some changes to the API, we may be able to harness this in many more situations.