Skip to main content

Cost-Based Optimizer

Cost-Based Optimizer in Databricks

Spark SQL can use a Cost-Based Optimizer (CBO) to improve query plans. This is especially useful for queries with multiple joins. For this to work it is critical to collect table and column statistics and keep them up to date.
This functionality requires Databricks Runtime 3.3 or above.

Collect statistics

To get the full benefit of the CBO it is important to collect both column statistics and table statistics. Statistics can be collected using the Analyze Table command.
Tip
To maintain the statistics up-to-date, run ANALYZE TABLE after writing to the table.

Verify query plans

There are several ways to verify the query plan.

EXPLAIN command

Use the SQL Explain command to check if the plan uses statistics. If statistics are missing then the query plan might not be optimal. Below is the sample explain plan
== Optimized Logical Plan ==
Aggregate [s_store_sk], [s_store_sk, count(1) AS count(1)L], Statistics(sizeInBytes=20.0 B, rowCount=1, hints=none)
+- Project [s_store_sk], Statistics(sizeInBytes=18.5 MB, rowCount=1.62E+6, hints=none)
   +- Join Inner, (d_date_sk = ss_sold_date_sk), Statistics(sizeInBytes=30.8 MB, rowCount=1.62E+6, hints=none)
      :- Project [ss_sold_date_sk, s_store_sk], Statistics(sizeInBytes=39.1 GB, rowCount=2.63E+9, hints=none)
      :  +- Join Inner, (s_store_sk = ss_store_sk), Statistics(sizeInBytes=48.9 GB, rowCount=2.63E+9, hints=none)
      :     :- Project [ss_store_sk, ss_sold_date_sk], Statistics(sizeInBytes=39.1 GB, rowCount=2.63E+9, hints=none)
      :     :  +- Filter (isnotnull(ss_store_sk) && isnotnull(ss_sold_date_sk)), Statistics(sizeInBytes=39.1 GB, rowCount=2.63E+9, hints=none)
      :     :     +- Relation[ss_store_sk,ss_sold_date_sk] parquet, Statistics(sizeInBytes=134.6 GB, rowCount=2.88E+9, hints=none)
      :     +- Project [s_store_sk], Statistics(sizeInBytes=11.7 KB, rowCount=1.00E+3, hints=none)
      :        +- Filter isnotnull(s_store_sk), Statistics(sizeInBytes=11.7 KB, rowCount=1.00E+3, hints=none)
      :           +- Relation[s_store_sk] parquet, Statistics(sizeInBytes=88.0 KB, rowCount=1.00E+3, hints=none)
      +- Project [d_date_sk], Statistics(sizeInBytes=12.0 B, rowCount=1, hints=none)
         +- Filter ((((isnotnull(d_year) && isnotnull(d_date)) && (d_year = 2000)) && (d_date = 2000-12-31)) && isnotnull(d_date_sk)), Statistics(sizeInBytes=38.0 B, rowCount=1, hints=none)
            +- Relation[d_date_sk,d_date,d_year] parquet, Statistics(sizeInBytes=1786.7 KB, rowCount=7.30E+4, hints=none)
Important
The rowCount statistic is especially important for queries with multiple joins. If rowCount is missing, it means there is not enough information to calculate it (that is, some required columns do not have statistics).

Spark SQL UI

Use the Spark SQL UI page to see the executed plan and accuracy of the statistics.
../../../_images/docs-cbo-nostats.png
Missing estimate
A line such as rows output: 2,451,005 est: N/A means that this operator produces approximately 2M rows and there were no statistics available.
../../../_images/docs-cbo-goodstats.png
Good estimate
A line such as rows output: 2,451,005 est: 1616404 (1X) means that this operator produces approx. 2M rows, while the estimate was approx. 1.6M and the estimation error factor was 1.
../../../_images/docs-cbo-badstats.png
Bad estimate
A line such as rows output: 2,451,005 est: 2626656323 means that this operator produces approximately 2M rows while the estimate was 2B rows, so the estimation error factor was 1000.

Disable the Cost-Based Optimizer

The CBO is enabled by default. You disable the CBO by changing the spark.sql.cbo.enabled flag.
Copy to clipboardCopy
spark.conf.set("spark.sql.cbo.enabled", false)

Comments

Post a Comment

Popular posts from this blog

Learn GitHub

Learn GitHub git init git add file.txt git commit -m "my first commit" git remote add origin https://github.com/dansullivanma/devlops_data_sci.git git clone https://github.com/dansullivanma/devlops_data_sci.git

Garbage collection in Databricks

Clean up snapshots Delta Lake provides snapshot isolation for reads, which means that it is safe to run  OPTIMIZE  even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the  VACUUM  command: VACUUM events You control the age of the latest retained snapshot by using the  RETAIN   <N>   HOURS  option: VACUUM events RETAIN 24 HOURS Test the garbage collection You can specify  DRY   RUN  to test the garbage collection and return a list of files to be deleted: VACUUM events DRY RUN Configure the retention threshold The  VACUUM  command removes any files that are no longer in the latest state of the transaction log for the table and are older than a retention threshold. The default threshold is 7 days, but you can specify an alternate retention interval. For example, to delete all stale files older t...

Z-Ordering

Z-Ordering in Databricks Z-Ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms to dramatically reduce the amount of data that needs to be read. To Z-Order data, you specify the columns to order on in the  ZORDER   BY  clause: OPTIMIZE events WHERE date >= current_timestamp () - INTERVAL 1 day ZORDER BY ( eventType ) You can specify multiple columns for  ZORDER   BY  as a comma-separated list. However, the effectiveness of the locality drops with each additional column. Z-Ordering on columns that do not have statistics collected on them would be ineffective and a waste of resources as data skipping requires column-local stats such as min, max, and count. You can configure statistics collection on certain columns by re-ordering columns in the schema and/or increasing the number of columns to collect s...