[SPARK-22938][SQL][FOLLOWUP] Assert that SQLConf.get is accessed only on the driver
[spark.git] / sql / catalyst / src / main / scala / org / apache / spark / sql / internal / SQLConf.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql.internal
19
20 import java.util.{Locale, NoSuchElementException, Properties, TimeZone}
21 import java.util.concurrent.TimeUnit
22 import java.util.concurrent.atomic.AtomicReference
23
24 import scala.collection.JavaConverters._
25 import scala.collection.immutable
26 import scala.util.matching.Regex
27
28 import org.apache.hadoop.fs.Path
29
30 import org.apache.spark.TaskContext
31 import org.apache.spark.internal.Logging
32 import org.apache.spark.internal.config._
33 import org.apache.spark.network.util.ByteUnit
34 import org.apache.spark.sql.catalyst.analysis.Resolver
35 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
36 import org.apache.spark.util.Utils
37
38 ////////////////////////////////////////////////////////////////////////////////////////////////////
39 // This file defines the configuration options for Spark SQL.
40 ////////////////////////////////////////////////////////////////////////////////////////////////////
41
42
43 object SQLConf {
44
45   private val sqlConfEntries = java.util.Collections.synchronizedMap(
46     new java.util.HashMap[String, ConfigEntry[_]]())
47
48   val staticConfKeys: java.util.Set[String] =
49     java.util.Collections.synchronizedSet(new java.util.HashSet[String]())
50
51   private def register(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
52     require(!sqlConfEntries.containsKey(entry.key),
53       s"Duplicate SQLConfigEntry. ${entry.key} has been registered")
54     sqlConfEntries.put(entry.key, entry)
55   }
56
57   // For testing only
58   private[sql] def unregister(entry: ConfigEntry[_]): Unit = sqlConfEntries.synchronized {
59     sqlConfEntries.remove(entry.key)
60   }
61
62   def buildConf(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
63
64   def buildStaticConf(key: String): ConfigBuilder = {
65     ConfigBuilder(key).onCreate { entry =>
66       staticConfKeys.add(entry.key)
67       SQLConf.register(entry)
68     }
69   }
70
71   /**
72    * Default config. Only used when there is no active SparkSession for the thread.
73    * See [[get]] for more information.
74    */
75   private lazy val fallbackConf = new ThreadLocal[SQLConf] {
76     override def initialValue: SQLConf = new SQLConf
77   }
78
79   /** See [[get]] for more information. */
80   def getFallbackConf: SQLConf = fallbackConf.get()
81
82   /**
83    * Defines a getter that returns the SQLConf within scope.
84    * See [[get]] for more information.
85    */
86   private val confGetter = new AtomicReference[() => SQLConf](() => fallbackConf.get())
87
88   /**
89    * Sets the active config object within the current scope.
90    * See [[get]] for more information.
91    */
92   def setSQLConfGetter(getter: () => SQLConf): Unit = {
93     confGetter.set(getter)
94   }
95
96   /**
97    * Returns the active config object within the current scope. If there is an active SparkSession,
98    * the proper SQLConf associated with the thread's session is used.
99    *
100    * The way this works is a little bit convoluted, due to the fact that config was added initially
101    * only for physical plans (and as a result not in sql/catalyst module).
102    *
103    * The first time a SparkSession is instantiated, we set the [[confGetter]] to return the
104    * active SparkSession's config. If there is no active SparkSession, it returns using the thread
105    * local [[fallbackConf]]. The reason [[fallbackConf]] is a thread local (rather than just a conf)
106    * is to support setting different config options for different threads so we can potentially
107    * run tests in parallel. At the time this feature was implemented, this was a no-op since we
108    * run unit tests (that does not involve SparkSession) in serial order.
109    */
110   def get: SQLConf = {
111     if (Utils.isTesting && TaskContext.get != null) {
112       // we're accessing it during task execution, fail.
113       throw new IllegalStateException("SQLConf should only be created and accessed on the driver.")
114     }
115     confGetter.get()()
116   }
117
118   val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations")
119     .internal()
120     .doc("The max number of iterations the optimizer and analyzer runs.")
121     .intConf
122     .createWithDefault(100)
123
124   val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
125     buildConf("spark.sql.optimizer.inSetConversionThreshold")
126       .internal()
127       .doc("The threshold of set size for InSet conversion.")
128       .intConf
129       .createWithDefault(10)
130
131   val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
132     .doc("When set to true Spark SQL will automatically select a compression codec for each " +
133       "column based on statistics of the data.")
134     .booleanConf
135     .createWithDefault(true)
136
137   val COLUMN_BATCH_SIZE = buildConf("spark.sql.inMemoryColumnarStorage.batchSize")
138     .doc("Controls the size of batches for columnar caching.  Larger batch sizes can improve " +
139       "memory utilization and compression, but risk OOMs when caching data.")
140     .intConf
141     .createWithDefault(10000)
142
143   val IN_MEMORY_PARTITION_PRUNING =
144     buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
145       .internal()
146       .doc("When true, enable partition pruning for in-memory columnar tables.")
147       .booleanConf
148       .createWithDefault(true)
149
150   val CACHE_VECTORIZED_READER_ENABLED =
151     buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
152       .doc("Enables vectorized reader for columnar caching.")
153       .booleanConf
154       .createWithDefault(true)
155
156   val COLUMN_VECTOR_OFFHEAP_ENABLED =
157     buildConf("spark.sql.columnVector.offheap.enabled")
158       .internal()
159       .doc("When true, use OffHeapColumnVector in ColumnarBatch.")
160       .booleanConf
161       .createWithDefault(false)
162
163   val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
164     .internal()
165     .doc("When true, prefer sort merge join over shuffle hash join.")
166     .booleanConf
167     .createWithDefault(true)
168
169   val RADIX_SORT_ENABLED = buildConf("spark.sql.sort.enableRadixSort")
170     .internal()
171     .doc("When true, enable use of radix sort when possible. Radix sort is much faster but " +
172       "requires additional memory to be reserved up-front. The memory overhead may be " +
173       "significant when sorting very small rows (up to 50% more in this case).")
174     .booleanConf
175     .createWithDefault(true)
176
177   val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
178     .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
179       "nodes when performing a join.  By setting this value to -1 broadcasting can be disabled. " +
180       "Note that currently statistics are only supported for Hive Metastore tables where the " +
181       "command <code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been " +
182       "run, and file-based data source tables where the statistics are computed directly on " +
183       "the files of data.")
184     .longConf
185     .createWithDefault(10L * 1024 * 1024)
186
187   val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor")
188     .internal()
189     .doc("Minimal increase rate in number of partitions between attempts when executing a take " +
190       "on a query. Higher values lead to more partitions read. Lower values might lead to " +
191       "longer execution times as more jobs will be run")
192     .intConf
193     .createWithDefault(4)
194
195   val ADVANCED_PARTITION_PREDICATE_PUSHDOWN =
196     buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled")
197       .internal()
198       .doc("When true, advanced partition predicate pushdown into Hive metastore is enabled.")
199       .booleanConf
200       .createWithDefault(true)
201
202   val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
203     buildConf("spark.sql.statistics.fallBackToHdfs")
204     .doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
205       " This is useful in determining if a table is small enough to use auto broadcast joins.")
206     .booleanConf
207     .createWithDefault(false)
208
209   val DEFAULT_SIZE_IN_BYTES = buildConf("spark.sql.defaultSizeInBytes")
210     .internal()
211     .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " +
212       "which is larger than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. " +
213       "That is to say by default the optimizer will not choose to broadcast a table unless it " +
214       "knows for sure its size is small enough.")
215     .longConf
216     .createWithDefault(Long.MaxValue)
217
218   val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
219     .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
220     .intConf
221     .createWithDefault(200)
222
223   val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
224     buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
225       .doc("The target post-shuffle input size in bytes of a task.")
226       .bytesConf(ByteUnit.BYTE)
227       .createWithDefault(64 * 1024 * 1024)
228
229   val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
230     .doc("When true, enable adaptive query execution.")
231     .booleanConf
232     .createWithDefault(false)
233
234   val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
235     buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
236       .internal()
237       .doc("The advisory minimal number of post-shuffle partitions provided to " +
238         "ExchangeCoordinator. This setting is used in our test to make sure we " +
239         "have enough parallelism to expose issues that will not be exposed with a " +
240         "single partition. When the value is a non-positive value, this setting will " +
241         "not be provided to ExchangeCoordinator.")
242       .intConf
243       .createWithDefault(-1)
244
245   val SUBEXPRESSION_ELIMINATION_ENABLED =
246     buildConf("spark.sql.subexpressionElimination.enabled")
247       .internal()
248       .doc("When true, common subexpressions will be eliminated.")
249       .booleanConf
250       .createWithDefault(true)
251
252   val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive")
253     .internal()
254     .doc("Whether the query analyzer should be case sensitive or not. " +
255       "Default to case insensitive. It is highly discouraged to turn on case sensitive mode.")
256     .booleanConf
257     .createWithDefault(false)
258
259   val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
260     .internal()
261     .doc("When true, the query optimizer will infer and propagate data constraints in the query " +
262       "plan to optimize them. Constraint propagation can sometimes be computationally expensive " +
263       "for certain kinds of query plans (such as those with a large number of predicates and " +
264       "aliases) which might negatively impact overall runtime.")
265     .booleanConf
266     .createWithDefault(true)
267
268   val ESCAPED_STRING_LITERALS = buildConf("spark.sql.parser.escapedStringLiterals")
269     .internal()
270     .doc("When true, string literals (including regex patterns) remain escaped in our SQL " +
271       "parser. The default is false since Spark 2.0. Setting it to true can restore the behavior " +
272       "prior to Spark 2.0.")
273     .booleanConf
274     .createWithDefault(false)
275
276   val FILE_COMRESSION_FACTOR = buildConf("spark.sql.sources.fileCompressionFactor")
277     .internal()
278     .doc("When estimating the output data size of a table scan, multiply the file size with this " +
279       "factor as the estimated data size, in case the data is compressed in the file and lead to" +
280       " a heavily underestimated result.")
281     .doubleConf
282     .checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
283     .createWithDefault(1.0)
284
285   val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
286     .doc("When true, the Parquet data source merges schemas collected from all data files, " +
287          "otherwise the schema is picked from the summary file or a random data file " +
288          "if no summary file is available.")
289     .booleanConf
290     .createWithDefault(false)
291
292   val PARQUET_SCHEMA_RESPECT_SUMMARIES = buildConf("spark.sql.parquet.respectSummaryFiles")
293     .doc("When true, we make assumption that all part-files of Parquet are consistent with " +
294          "summary files and we will ignore them when merging schema. Otherwise, if this is " +
295          "false, which is the default, we will merge all part-files. This should be considered " +
296          "as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
297     .booleanConf
298     .createWithDefault(false)
299
300   val PARQUET_BINARY_AS_STRING = buildConf("spark.sql.parquet.binaryAsString")
301     .doc("Some other Parquet-producing systems, in particular Impala and older versions of " +
302       "Spark SQL, do not differentiate between binary data and strings when writing out the " +
303       "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
304       "compatibility with these systems.")
305     .booleanConf
306     .createWithDefault(false)
307
308   val PARQUET_INT96_AS_TIMESTAMP = buildConf("spark.sql.parquet.int96AsTimestamp")
309     .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
310       "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
311       "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
312       "provide compatibility with these systems.")
313     .booleanConf
314     .createWithDefault(true)
315
316   val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion")
317     .doc("This controls whether timestamp adjustments should be applied to INT96 data when " +
318       "converting to timestamps, for data written by Impala.  This is necessary because Impala " +
319       "stores INT96 data with a different timezone offset than Hive & Spark.")
320     .booleanConf
321     .createWithDefault(false)
322
323   object ParquetOutputTimestampType extends Enumeration {
324     val INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
325   }
326
327   val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType")
328     .doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " +
329       "INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " +
330       "is a standard timestamp type in Parquet, which stores number of microseconds from the " +
331       "Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
332       "means Spark has to truncate the microsecond portion of its timestamp value.")
333     .stringConf
334     .transform(_.toUpperCase(Locale.ROOT))
335     .checkValues(ParquetOutputTimestampType.values.map(_.toString))
336     .createWithDefault(ParquetOutputTimestampType.INT96.toString)
337
338   val PARQUET_INT64_AS_TIMESTAMP_MILLIS = buildConf("spark.sql.parquet.int64AsTimestampMillis")
339     .doc(s"(Deprecated since Spark 2.3, please set ${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}.) " +
340       "When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
341       "extended type. In this mode, the microsecond portion of the timestamp value will be" +
342       "truncated.")
343     .booleanConf
344     .createWithDefault(false)
345
346   val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
347     .doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
348       "`parquet.compression` is specified in the table-specific options/properties, the " +
349       "precedence would be `compression`, `parquet.compression`, " +
350       "`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " +
351       "snappy, gzip, lzo.")
352     .stringConf
353     .transform(_.toLowerCase(Locale.ROOT))
354     .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
355     .createWithDefault("snappy")
356
357   val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
358     .doc("Enables Parquet filter push-down optimization when set to true.")
359     .booleanConf
360     .createWithDefault(true)
361
362   val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date")
363     .doc("If true, enables Parquet filter push-down optimization for Date. " +
364       "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.")
365     .internal()
366     .booleanConf
367     .createWithDefault(true)
368
369   val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
370     .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " +
371       "versions, when converting Parquet schema to Spark SQL schema and vice versa.")
372     .booleanConf
373     .createWithDefault(false)
374
375   val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled")
376     .doc("If true, enables Parquet's native record-level filtering using the pushed down " +
377       "filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown' " +
378       "is enabled.")
379     .booleanConf
380     .createWithDefault(false)
381
382   val PARQUET_OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.parquet.output.committer.class")
383     .doc("The output committer class used by Parquet. The specified class needs to be a " +
384       "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
385       "of org.apache.parquet.hadoop.ParquetOutputCommitter. If it is not, then metadata summaries" +
386       "will never be created, irrespective of the value of parquet.enable.summary-metadata")
387     .internal()
388     .stringConf
389     .createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
390
391   val PARQUET_VECTORIZED_READER_ENABLED =
392     buildConf("spark.sql.parquet.enableVectorizedReader")
393       .doc("Enables vectorized parquet decoding.")
394       .booleanConf
395       .createWithDefault(true)
396
397   val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize")
398     .doc("The number of rows to include in a parquet vectorized reader batch. The number should " +
399       "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
400     .intConf
401     .createWithDefault(4096)
402
403   val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
404     .doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
405       "`orc.compress` is specified in the table-specific options/properties, the precedence " +
406       "would be `compression`, `orc.compress`, `spark.sql.orc.compression.codec`." +
407       "Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
408     .stringConf
409     .transform(_.toLowerCase(Locale.ROOT))
410     .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
411     .createWithDefault("snappy")
412
413   val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
414     .doc("When native, use the native version of ORC support instead of the ORC library in Hive " +
415       "1.2.1. It is 'hive' by default prior to Spark 2.4.")
416     .internal()
417     .stringConf
418     .checkValues(Set("hive", "native"))
419     .createWithDefault("native")
420
421   val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader")
422     .doc("Enables vectorized orc decoding.")
423     .booleanConf
424     .createWithDefault(true)
425
426   val ORC_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.orc.columnarReaderBatchSize")
427     .doc("The number of rows to include in a orc vectorized reader batch. The number should " +
428       "be carefully chosen to minimize overhead and avoid OOMs in reading data.")
429     .intConf
430     .createWithDefault(4096)
431
432   val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
433     .doc("Whether or not to copy the ORC columnar batch to Spark columnar batch in the " +
434       "vectorized ORC reader.")
435     .internal()
436     .booleanConf
437     .createWithDefault(false)
438
439   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
440     .doc("When true, enable filter pushdown for ORC files.")
441     .booleanConf
442     .createWithDefault(true)
443
444   val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
445     .doc("When true, check all the partition paths under the table\'s root directory " +
446          "when reading data stored in HDFS. This configuration will be deprecated in the future " +
447          "releases and replaced by spark.files.ignoreMissingFiles.")
448     .booleanConf
449     .createWithDefault(false)
450
451   val HIVE_METASTORE_PARTITION_PRUNING =
452     buildConf("spark.sql.hive.metastorePartitionPruning")
453       .doc("When true, some predicates will be pushed down into the Hive metastore so that " +
454            "unmatching partitions can be eliminated earlier. This only affects Hive tables " +
455            "not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " +
456            "HiveUtils.CONVERT_METASTORE_ORC for more information).")
457       .booleanConf
458       .createWithDefault(true)
459
460   val HIVE_MANAGE_FILESOURCE_PARTITIONS =
461     buildConf("spark.sql.hive.manageFilesourcePartitions")
462       .doc("When true, enable metastore partition management for file source tables as well. " +
463            "This includes both datasource and converted Hive tables. When partition management " +
464            "is enabled, datasource tables store partition in the Hive metastore, and use the " +
465            "metastore to prune partitions during query planning.")
466       .booleanConf
467       .createWithDefault(true)
468
469   val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
470     buildConf("spark.sql.hive.filesourcePartitionFileCacheSize")
471       .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
472            "a cache that can use up to specified num bytes for file metadata. This conf only " +
473            "has an effect when hive filesource partition management is enabled.")
474       .longConf
475       .createWithDefault(250 * 1024 * 1024)
476
477   object HiveCaseSensitiveInferenceMode extends Enumeration {
478     val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value
479   }
480
481   val HIVE_CASE_SENSITIVE_INFERENCE = buildConf("spark.sql.hive.caseSensitiveInferenceMode")
482     .doc("Sets the action to take when a case-sensitive schema cannot be read from a Hive " +
483       "table's properties. Although Spark SQL itself is not case-sensitive, Hive compatible file " +
484       "formats such as Parquet are. Spark SQL must use a case-preserving schema when querying " +
485       "any table backed by files containing case-sensitive field names or queries may not return " +
486       "accurate results. Valid options include INFER_AND_SAVE (the default mode-- infer the " +
487       "case-sensitive schema from the underlying data files and write it back to the table " +
488       "properties), INFER_ONLY (infer the schema but don't attempt to write it to the table " +
489       "properties) and NEVER_INFER (fallback to using the case-insensitive metastore schema " +
490       "instead of inferring).")
491     .stringConf
492     .transform(_.toUpperCase(Locale.ROOT))
493     .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
494     .createWithDefault(HiveCaseSensitiveInferenceMode.INFER_AND_SAVE.toString)
495
496   val TYPECOERCION_COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP =
497     buildConf("spark.sql.typeCoercion.compareDateTimestampInTimestamp")
498       .internal()
499       .doc("When true (default), compare Date with Timestamp after converting both sides to " +
500         "Timestamp. This behavior is compatible with Hive 2.2 or later. See HIVE-15236. " +
501         "When false, restore the behavior prior to Spark 2.4. Compare Date with Timestamp after " +
502         "converting both sides to string. This config will be removed in spark 3.0")
503       .booleanConf
504       .createWithDefault(true)
505
506   val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
507     .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
508       "to produce the partition columns instead of table scans. It applies when all the columns " +
509       "scanned are partition columns and the query has an aggregate operator that satisfies " +
510       "distinct semantics.")
511     .booleanConf
512     .createWithDefault(true)
513
514   val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
515     .doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
516       "to parse.")
517     .stringConf
518     .createWithDefault("_corrupt_record")
519
520   val FROM_JSON_FORCE_NULLABLE_SCHEMA = buildConf("spark.sql.fromJsonForceNullableSchema")
521     .internal()
522     .doc("When true, force the output schema of the from_json() function to be nullable " +
523       "(including all the fields). Otherwise, the schema might not be compatible with" +
524       "actual data, which leads to curruptions.")
525     .booleanConf
526     .createWithDefault(true)
527
528   val BROADCAST_TIMEOUT = buildConf("spark.sql.broadcastTimeout")
529     .doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
530     .timeConf(TimeUnit.SECONDS)
531     .createWithDefault(5 * 60)
532
533   // This is only used for the thriftserver
534   val THRIFTSERVER_POOL = buildConf("spark.sql.thriftserver.scheduler.pool")
535     .doc("Set a Fair Scheduler pool for a JDBC client session.")
536     .stringConf
537     .createOptional
538
539   val THRIFTSERVER_INCREMENTAL_COLLECT =
540     buildConf("spark.sql.thriftServer.incrementalCollect")
541       .internal()
542       .doc("When true, enable incremental collection for execution in Thrift Server.")
543       .booleanConf
544       .createWithDefault(false)
545
546   val THRIFTSERVER_UI_STATEMENT_LIMIT =
547     buildConf("spark.sql.thriftserver.ui.retainedStatements")
548       .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
549       .intConf
550       .createWithDefault(200)
551
552   val THRIFTSERVER_UI_SESSION_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedSessions")
553     .doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
554     .intConf
555     .createWithDefault(200)
556
557   // This is used to set the default data source
558   val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
559     .doc("The default data source to use in input/output.")
560     .stringConf
561     .createWithDefault("parquet")
562
563   val CONVERT_CTAS = buildConf("spark.sql.hive.convertCTAS")
564     .internal()
565     .doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
566       "without specifying any storage property will be converted to a data source table, " +
567       "using the data source set by spark.sql.sources.default.")
568     .booleanConf
569     .createWithDefault(false)
570
571   val GATHER_FASTSTAT = buildConf("spark.sql.hive.gatherFastStats")
572       .internal()
573       .doc("When true, fast stats (number of files and total size of all files) will be gathered" +
574         " in parallel while repairing table partitions to avoid the sequential listing in Hive" +
575         " metastore.")
576       .booleanConf
577       .createWithDefault(true)
578
579   val PARTITION_COLUMN_TYPE_INFERENCE =
580     buildConf("spark.sql.sources.partitionColumnTypeInference.enabled")
581       .doc("When true, automatically infer the data types for partitioned columns.")
582       .booleanConf
583       .createWithDefault(true)
584
585   val BUCKETING_ENABLED = buildConf("spark.sql.sources.bucketing.enabled")
586     .doc("When false, we will treat bucketed table as normal table")
587     .booleanConf
588     .createWithDefault(true)
589
590   val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
591     .doc("When false, we will throw an error if a query contains a cartesian product without " +
592         "explicit CROSS JOIN syntax.")
593     .booleanConf
594     .createWithDefault(false)
595
596   val ORDER_BY_ORDINAL = buildConf("spark.sql.orderByOrdinal")
597     .doc("When true, the ordinal numbers are treated as the position in the select list. " +
598          "When false, the ordinal numbers in order/sort by clause are ignored.")
599     .booleanConf
600     .createWithDefault(true)
601
602   val GROUP_BY_ORDINAL = buildConf("spark.sql.groupByOrdinal")
603     .doc("When true, the ordinal numbers in group by clauses are treated as the position " +
604       "in the select list. When false, the ordinal numbers are ignored.")
605     .booleanConf
606     .createWithDefault(true)
607
608   val GROUP_BY_ALIASES = buildConf("spark.sql.groupByAliases")
609     .doc("When true, aliases in a select list can be used in group by clauses. When false, " +
610       "an analysis exception is thrown in the case.")
611     .booleanConf
612     .createWithDefault(true)
613
614   // The output committer class used by data sources. The specified class needs to be a
615   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
616   val OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.sources.outputCommitterClass")
617     .internal()
618     .stringConf
619     .createOptional
620
621   val FILE_COMMIT_PROTOCOL_CLASS =
622     buildConf("spark.sql.sources.commitProtocolClass")
623       .internal()
624       .stringConf
625       .createWithDefault(
626         "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
627
628   val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
629     buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
630       .doc("The maximum number of paths allowed for listing files at driver side. If the number " +
631         "of detected paths exceeds this value during partition discovery, it tries to list the " +
632         "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
633         "LibSVM data sources.")
634       .intConf
635       .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +
636         "files at driver side must not be negative")
637       .createWithDefault(32)
638
639   val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
640     buildConf("spark.sql.sources.parallelPartitionDiscovery.parallelism")
641       .doc("The number of parallelism to list a collection of path recursively, Set the " +
642         "number to prevent file listing from generating too many tasks.")
643       .internal()
644       .intConf
645       .createWithDefault(10000)
646
647   // Whether to automatically resolve ambiguity in join conditions for self-joins.
648   // See SPARK-6231.
649   val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
650     buildConf("spark.sql.selfJoinAutoResolveAmbiguity")
651       .internal()
652       .booleanConf
653       .createWithDefault(true)
654
655   // Whether to retain group by columns or not in GroupedData.agg.
656   val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns")
657     .internal()
658     .booleanConf
659     .createWithDefault(true)
660
661   val DATAFRAME_PIVOT_MAX_VALUES = buildConf("spark.sql.pivotMaxValues")
662     .doc("When doing a pivot without specifying values for the pivot column this is the maximum " +
663       "number of (distinct) values that will be collected without error.")
664     .intConf
665     .createWithDefault(10000)
666
667   val RUN_SQL_ON_FILES = buildConf("spark.sql.runSQLOnFiles")
668     .internal()
669     .doc("When true, we could use `datasource`.`path` as table in SQL query.")
670     .booleanConf
671     .createWithDefault(true)
672
673   val WHOLESTAGE_CODEGEN_ENABLED = buildConf("spark.sql.codegen.wholeStage")
674     .internal()
675     .doc("When true, the whole stage (of multiple operators) will be compiled into single java" +
676       " method.")
677     .booleanConf
678     .createWithDefault(true)
679
680   val WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME =
681     buildConf("spark.sql.codegen.useIdInClassName")
682     .internal()
683     .doc("When true, embed the (whole-stage) codegen stage ID into " +
684       "the class name of the generated class as a suffix")
685     .booleanConf
686     .createWithDefault(true)
687
688   val WHOLESTAGE_MAX_NUM_FIELDS = buildConf("spark.sql.codegen.maxFields")
689     .internal()
690     .doc("The maximum number of fields (including nested fields) that will be supported before" +
691       " deactivating whole-stage codegen.")
692     .intConf
693     .createWithDefault(100)
694
695   val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
696     .internal()
697     .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" +
698       " fail to compile generated code")
699     .booleanConf
700     .createWithDefault(true)
701
702   val CODEGEN_LOGGING_MAX_LINES = buildConf("spark.sql.codegen.logging.maxLines")
703     .internal()
704     .doc("The maximum number of codegen lines to log when errors occur. Use -1 for unlimited.")
705     .intConf
706     .checkValue(maxLines => maxLines >= -1, "The maximum must be a positive integer, 0 to " +
707       "disable logging or -1 to apply no limit.")
708     .createWithDefault(1000)
709
710   val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit")
711     .internal()
712     .doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " +
713       "codegen. When the compiled function exceeds this threshold, the whole-stage codegen is " +
714       "deactivated for this subtree of the current query plan. The default value is 65535, which " +
715       "is the largest bytecode size possible for a valid Java method. When running on HotSpot, " +
716       s"it may be preferable to set the value to ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} " +
717       "to match HotSpot's implementation.")
718     .intConf
719     .createWithDefault(65535)
720
721   val WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR =
722     buildConf("spark.sql.codegen.splitConsumeFuncByOperator")
723       .internal()
724       .doc("When true, whole stage codegen would put the logic of consuming rows of each " +
725         "physical operator into individual methods, instead of a single big method. This can be " +
726         "used to avoid oversized function that can miss the opportunity of JIT optimization.")
727       .booleanConf
728       .createWithDefault(true)
729
730   val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
731     .doc("The maximum number of bytes to pack into a single partition when reading files.")
732     .longConf
733     .createWithDefault(128 * 1024 * 1024) // parquet.block.size
734
735   val FILES_OPEN_COST_IN_BYTES = buildConf("spark.sql.files.openCostInBytes")
736     .internal()
737     .doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
738       " the same time. This is used when putting multiple files into a partition. It's better to" +
739       " over estimated, then the partitions with small files will be faster than partitions with" +
740       " bigger files (which is scheduled first).")
741     .longConf
742     .createWithDefault(4 * 1024 * 1024)
743
744   val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles")
745     .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
746       "encountering corrupted files and the contents that have been read will still be returned.")
747     .booleanConf
748     .createWithDefault(false)
749
750   val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles")
751     .doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
752       "encountering missing files and the contents that have been read will still be returned.")
753     .booleanConf
754     .createWithDefault(false)
755
756   val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
757     .doc("Maximum number of records to write out to a single file. " +
758       "If this value is zero or negative, there is no limit.")
759     .longConf
760     .createWithDefault(0)
761
762   val EXCHANGE_REUSE_ENABLED = buildConf("spark.sql.exchange.reuse")
763     .internal()
764     .doc("When true, the planner will try to find out duplicated exchanges and re-use them.")
765     .booleanConf
766     .createWithDefault(true)
767
768   val STATE_STORE_PROVIDER_CLASS =
769     buildConf("spark.sql.streaming.stateStore.providerClass")
770       .internal()
771       .doc(
772         "The class used to manage state data in stateful streaming queries. This class must " +
773           "be a subclass of StateStoreProvider, and must have a zero-arg constructor.")
774       .stringConf
775       .createWithDefault(
776         "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
777
778   val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
779     buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
780       .internal()
781       .doc("Minimum number of state store delta files that needs to be generated before they " +
782         "consolidated into snapshots.")
783       .intConf
784       .createWithDefault(10)
785
786   val CHECKPOINT_LOCATION = buildConf("spark.sql.streaming.checkpointLocation")
787     .doc("The default location for storing checkpoint data for streaming queries.")
788     .stringConf
789     .createOptional
790
791   val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain")
792     .internal()
793     .doc("The minimum number of batches that must be retained and made recoverable.")
794     .intConf
795     .createWithDefault(100)
796
797   val UNSUPPORTED_OPERATION_CHECK_ENABLED =
798     buildConf("spark.sql.streaming.unsupportedOperationCheck")
799       .internal()
800       .doc("When true, the logical plan for streaming query will be checked for unsupported" +
801         " operations.")
802       .booleanConf
803       .createWithDefault(true)
804
805   val VARIABLE_SUBSTITUTE_ENABLED =
806     buildConf("spark.sql.variable.substitute")
807       .doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
808       .booleanConf
809       .createWithDefault(true)
810
811   val VARIABLE_SUBSTITUTE_DEPTH =
812     buildConf("spark.sql.variable.substitute.depth")
813       .internal()
814       .doc("Deprecated: The maximum replacements the substitution engine will do.")
815       .intConf
816       .createWithDefault(40)
817
818   val ENABLE_TWOLEVEL_AGG_MAP =
819     buildConf("spark.sql.codegen.aggregate.map.twolevel.enabled")
820       .internal()
821       .doc("Enable two-level aggregate hash map. When enabled, records will first be " +
822         "inserted/looked-up at a 1st-level, small, fast map, and then fallback to a " +
823         "2nd-level, larger, slower map when 1st level is full or keys cannot be found. " +
824         "When disabled, records go directly to the 2nd level. Defaults to true.")
825       .booleanConf
826       .createWithDefault(true)
827
828   val MAX_NESTED_VIEW_DEPTH =
829     buildConf("spark.sql.view.maxNestedViewDepth")
830       .internal()
831       .doc("The maximum depth of a view reference in a nested view. A nested view may reference " +
832         "other nested views, the dependencies are organized in a directed acyclic graph (DAG). " +
833         "However the DAG depth may become too large and cause unexpected behavior. This " +
834         "configuration puts a limit on this: when the depth of a view exceeds this value during " +
835         "analysis, we terminate the resolution to avoid potential errors.")
836       .intConf
837       .checkValue(depth => depth > 0, "The maximum depth of a view reference in a nested view " +
838         "must be positive.")
839       .createWithDefault(100)
840
841   val STREAMING_FILE_COMMIT_PROTOCOL_CLASS =
842     buildConf("spark.sql.streaming.commitProtocolClass")
843       .internal()
844       .stringConf
845       .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")
846
847   val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
848     buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
849       .internal()
850       .doc("In the case of ObjectHashAggregateExec, when the size of the in-memory hash map " +
851         "grows too large, we will fall back to sort-based aggregation. This option sets a row " +
852         "count threshold for the size of the hash map.")
853       .intConf
854       // We are trying to be conservative and use a relatively small default count threshold here
855       // since the state object of some TypedImperativeAggregate function can be quite large (e.g.
856       // percentile_approx).
857       .createWithDefault(128)
858
859   val USE_OBJECT_HASH_AGG = buildConf("spark.sql.execution.useObjectHashAggregateExec")
860     .internal()
861     .doc("Decides if we use ObjectHashAggregateExec")
862     .booleanConf
863     .createWithDefault(true)
864
865   val FILE_SINK_LOG_DELETION = buildConf("spark.sql.streaming.fileSink.log.deletion")
866     .internal()
867     .doc("Whether to delete the expired log files in file stream sink.")
868     .booleanConf
869     .createWithDefault(true)
870
871   val FILE_SINK_LOG_COMPACT_INTERVAL =
872     buildConf("spark.sql.streaming.fileSink.log.compactInterval")
873       .internal()
874       .doc("Number of log files after which all the previous files " +
875         "are compacted into the next log file.")
876       .intConf
877       .createWithDefault(10)
878
879   val FILE_SINK_LOG_CLEANUP_DELAY =
880     buildConf("spark.sql.streaming.fileSink.log.cleanupDelay")
881       .internal()
882       .doc("How long that a file is guaranteed to be visible for all readers.")
883       .timeConf(TimeUnit.MILLISECONDS)
884       .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
885
886   val FILE_SOURCE_LOG_DELETION = buildConf("spark.sql.streaming.fileSource.log.deletion")
887     .internal()
888     .doc("Whether to delete the expired log files in file stream source.")
889     .booleanConf
890     .createWithDefault(true)
891
892   val FILE_SOURCE_LOG_COMPACT_INTERVAL =
893     buildConf("spark.sql.streaming.fileSource.log.compactInterval")
894       .internal()
895       .doc("Number of log files after which all the previous files " +
896         "are compacted into the next log file.")
897       .intConf
898       .createWithDefault(10)
899
900   val FILE_SOURCE_LOG_CLEANUP_DELAY =
901     buildConf("spark.sql.streaming.fileSource.log.cleanupDelay")
902       .internal()
903       .doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
904       .timeConf(TimeUnit.MILLISECONDS)
905       .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes
906
907   val STREAMING_SCHEMA_INFERENCE =
908     buildConf("spark.sql.streaming.schemaInference")
909       .internal()
910       .doc("Whether file-based streaming sources will infer its own schema")
911       .booleanConf
912       .createWithDefault(false)
913
914   val STREAMING_POLLING_DELAY =
915     buildConf("spark.sql.streaming.pollingDelay")
916       .internal()
917       .doc("How long to delay polling new data when no data is available")
918       .timeConf(TimeUnit.MILLISECONDS)
919       .createWithDefault(10L)
920
921   val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
922     buildConf("spark.sql.streaming.noDataProgressEventInterval")
923       .internal()
924       .doc("How long to wait between two progress events when there is no data")
925       .timeConf(TimeUnit.MILLISECONDS)
926       .createWithDefault(10000L)
927
928   val STREAMING_NO_DATA_MICRO_BATCHES_ENABLED =
929     buildConf("spark.sql.streaming.noDataMicroBatchesEnabled")
930       .doc(
931         "Whether streaming micro-batch engine will execute batches without data " +
932           "for eager state management for stateful streaming queries.")
933       .booleanConf
934       .createWithDefault(true)
935
936   val STREAMING_METRICS_ENABLED =
937     buildConf("spark.sql.streaming.metricsEnabled")
938       .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
939       .booleanConf
940       .createWithDefault(false)
941
942   val STREAMING_PROGRESS_RETENTION =
943     buildConf("spark.sql.streaming.numRecentProgressUpdates")
944       .doc("The number of progress updates to retain for a streaming query")
945       .intConf
946       .createWithDefault(100)
947
948   val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS =
949     buildConf("spark.sql.streaming.checkpointFileManagerClass")
950       .doc("The class used to write checkpoint files atomically. This class must be a subclass " +
951         "of the interface CheckpointFileManager.")
952       .internal()
953       .stringConf
954
955   val NDV_MAX_ERROR =
956     buildConf("spark.sql.statistics.ndv.maxError")
957       .internal()
958       .doc("The maximum estimation error allowed in HyperLogLog++ algorithm when generating " +
959         "column level statistics.")
960       .doubleConf
961       .createWithDefault(0.05)
962
963   val HISTOGRAM_ENABLED =
964     buildConf("spark.sql.statistics.histogram.enabled")
965       .doc("Generates histograms when computing column statistics if enabled. Histograms can " +
966         "provide better estimation accuracy. Currently, Spark only supports equi-height " +
967         "histogram. Note that collecting histograms takes extra cost. For example, collecting " +
968         "column statistics usually takes only one table scan, but generating equi-height " +
969         "histogram will cause an extra table scan.")
970       .booleanConf
971       .createWithDefault(false)
972
973   val HISTOGRAM_NUM_BINS =
974     buildConf("spark.sql.statistics.histogram.numBins")
975       .internal()
976       .doc("The number of bins when generating histograms.")
977       .intConf
978       .checkValue(num => num > 1, "The number of bins must be larger than 1.")
979       .createWithDefault(254)
980
981   val PERCENTILE_ACCURACY =
982     buildConf("spark.sql.statistics.percentile.accuracy")
983       .internal()
984       .doc("Accuracy of percentile approximation when generating equi-height histograms. " +
985         "Larger value means better accuracy. The relative error can be deduced by " +
986         "1.0 / PERCENTILE_ACCURACY.")
987       .intConf
988       .createWithDefault(10000)
989
990   val AUTO_SIZE_UPDATE_ENABLED =
991     buildConf("spark.sql.statistics.size.autoUpdate.enabled")
992       .doc("Enables automatic update for table size once table's data is changed. Note that if " +
993         "the total number of files of the table is very large, this can be expensive and slow " +
994         "down data change commands.")
995       .booleanConf
996       .createWithDefault(false)
997
998   val CBO_ENABLED =
999     buildConf("spark.sql.cbo.enabled")
1000       .doc("Enables CBO for estimation of plan statistics when set true.")
1001       .booleanConf
1002       .createWithDefault(false)
1003
1004   val JOIN_REORDER_ENABLED =
1005     buildConf("spark.sql.cbo.joinReorder.enabled")
1006       .doc("Enables join reorder in CBO.")
1007       .booleanConf
1008       .createWithDefault(false)
1009
1010   val JOIN_REORDER_DP_THRESHOLD =
1011     buildConf("spark.sql.cbo.joinReorder.dp.threshold")
1012       .doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.")
1013       .intConf
1014       .checkValue(number => number > 0, "The maximum number must be a positive integer.")
1015       .createWithDefault(12)
1016
1017   val JOIN_REORDER_CARD_WEIGHT =
1018     buildConf("spark.sql.cbo.joinReorder.card.weight")
1019       .internal()
1020       .doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " +
1021         "rows * weight + size * (1 - weight).")
1022       .doubleConf
1023       .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
1024       .createWithDefault(0.7)
1025
1026   val JOIN_REORDER_DP_STAR_FILTER =
1027     buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
1028       .doc("Applies star-join filter heuristics to cost based join enumeration.")
1029       .booleanConf
1030       .createWithDefault(false)
1031
1032   val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
1033     .doc("When true, it enables join reordering based on star schema detection. ")
1034     .booleanConf
1035     .createWithDefault(false)
1036
1037   val STARSCHEMA_FACT_TABLE_RATIO = buildConf("spark.sql.cbo.starJoinFTRatio")
1038     .internal()
1039     .doc("Specifies the upper limit of the ratio between the largest fact tables" +
1040       " for a star join to be considered. ")
1041     .doubleConf
1042     .createWithDefault(0.9)
1043
1044   val SESSION_LOCAL_TIMEZONE =
1045     buildConf("spark.sql.session.timeZone")
1046       .doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""")
1047       .stringConf
1048       .createWithDefaultFunction(() => TimeZone.getDefault.getID)
1049
1050   val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
1051     buildConf("spark.sql.windowExec.buffer.in.memory.threshold")
1052       .internal()
1053       .doc("Threshold for number of rows guaranteed to be held in memory by the window operator")
1054       .intConf
1055       .createWithDefault(4096)
1056
1057   val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
1058     buildConf("spark.sql.windowExec.buffer.spill.threshold")
1059       .internal()
1060       .doc("Threshold for number of rows to be spilled by window operator")
1061       .intConf
1062       .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
1063
1064   val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
1065     buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
1066       .internal()
1067       .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
1068         "join operator")
1069       .intConf
1070       .createWithDefault(Int.MaxValue)
1071
1072   val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
1073     buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
1074       .internal()
1075       .doc("Threshold for number of rows to be spilled by sort merge join operator")
1076       .intConf
1077       .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
1078
1079   val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
1080     buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
1081       .internal()
1082       .doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " +
1083         "product operator")
1084       .intConf
1085       .createWithDefault(4096)
1086
1087   val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
1088     buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
1089       .internal()
1090       .doc("Threshold for number of rows to be spilled by cartesian product operator")
1091       .intConf
1092       .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
1093
1094   val SUPPORT_QUOTED_REGEX_COLUMN_NAME = buildConf("spark.sql.parser.quotedRegexColumnNames")
1095     .doc("When true, quoted Identifiers (using backticks) in SELECT statement are interpreted" +
1096       " as regular expressions.")
1097     .booleanConf
1098     .createWithDefault(false)
1099
1100   val RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION =
1101     buildConf("spark.sql.execution.rangeExchange.sampleSizePerPartition")
1102       .internal()
1103       .doc("Number of points to sample per partition in order to determine the range boundaries" +
1104           " for range partitioning, typically used in global sorting (without limit).")
1105       .intConf
1106       .createWithDefault(100)
1107
1108   val ARROW_EXECUTION_ENABLED =
1109     buildConf("spark.sql.execution.arrow.enabled")
1110       .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " +
1111         "for use with pyspark.sql.DataFrame.toPandas, and " +
1112         "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. " +
1113         "The following data types are unsupported: " +
1114         "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.")
1115       .booleanConf
1116       .createWithDefault(false)
1117
1118   val ARROW_FALLBACK_ENABLED =
1119     buildConf("spark.sql.execution.arrow.fallback.enabled")
1120       .doc("When true, optimizations enabled by 'spark.sql.execution.arrow.enabled' will " +
1121         "fallback automatically to non-optimized implementations if an error occurs.")
1122       .booleanConf
1123       .createWithDefault(true)
1124
1125   val ARROW_EXECUTION_MAX_RECORDS_PER_BATCH =
1126     buildConf("spark.sql.execution.arrow.maxRecordsPerBatch")
1127       .doc("When using Apache Arrow, limit the maximum number of records that can be written " +
1128         "to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.")
1129       .intConf
1130       .createWithDefault(10000)
1131
1132   val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE =
1133     buildConf("spark.sql.execution.pandas.respectSessionTimeZone")
1134       .internal()
1135       .doc("When true, make Pandas DataFrame with timestamp type respecting session local " +
1136         "timezone when converting to/from Pandas DataFrame. This configuration will be " +
1137         "deprecated in the future releases.")
1138       .booleanConf
1139       .createWithDefault(true)
1140
1141   val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
1142     .internal()
1143     .doc("When true, the apply function of the rule verifies whether the right node of the" +
1144       " except operation is of type Filter or Project followed by Filter. If yes, the rule" +
1145       " further verifies 1) Excluding the filter operations from the right (as well as the" +
1146       " left node, if any) on the top, whether both the nodes evaluates to a same result." +
1147       " 2) The left and right nodes don't contain any SubqueryExpressions. 3) The output" +
1148       " column names of the left node are distinct. If all the conditions are met, the" +
1149       " rule will replace the except operation with a Filter by flipping the filter" +
1150       " condition(s) of the right node.")
1151     .booleanConf
1152     .createWithDefault(true)
1153
1154   val DECIMAL_OPERATIONS_ALLOW_PREC_LOSS =
1155     buildConf("spark.sql.decimalOperations.allowPrecisionLoss")
1156       .internal()
1157       .doc("When true (default), establishing the result type of an arithmetic operation " +
1158         "happens according to Hive behavior and SQL ANSI 2011 specification, ie. rounding the " +
1159         "decimal part of the result if an exact representation is not possible. Otherwise, NULL " +
1160         "is returned in those cases, as previously.")
1161       .booleanConf
1162       .createWithDefault(true)
1163
1164   val SQL_STRING_REDACTION_PATTERN =
1165     ConfigBuilder("spark.sql.redaction.string.regex")
1166       .doc("Regex to decide which parts of strings produced by Spark contain sensitive " +
1167         "information. When this regex matches a string part, that string part is replaced by a " +
1168         "dummy value. This is currently used to redact the output of SQL explain commands. " +
1169         "When this conf is not set, the value from `spark.redaction.string.regex` is used.")
1170       .fallbackConf(org.apache.spark.internal.config.STRING_REDACTION_PATTERN)
1171
1172   val CONCAT_BINARY_AS_STRING = buildConf("spark.sql.function.concatBinaryAsString")
1173     .doc("When this option is set to false and all inputs are binary, `functions.concat` returns " +
1174       "an output as binary. Otherwise, it returns as a string. ")
1175     .booleanConf
1176     .createWithDefault(false)
1177
1178   val ELT_OUTPUT_AS_STRING = buildConf("spark.sql.function.eltOutputAsString")
1179     .doc("When this option is set to false and all inputs are binary, `elt` returns " +
1180       "an output as binary. Otherwise, it returns as a string. ")
1181     .booleanConf
1182     .createWithDefault(false)
1183
1184   val ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION =
1185     buildConf("spark.sql.allowCreatingManagedTableUsingNonemptyLocation")
1186     .internal()
1187     .doc("When this option is set to true, creating managed tables with nonempty location " +
1188       "is allowed. Otherwise, an analysis exception is thrown. ")
1189     .booleanConf
1190     .createWithDefault(false)
1191
1192   val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
1193     buildConf("spark.sql.streaming.continuous.executorQueueSize")
1194     .internal()
1195     .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
1196       " buffer the results of a ContinuousDataReader.")
1197     .intConf
1198     .createWithDefault(1024)
1199
1200   val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS =
1201     buildConf("spark.sql.streaming.continuous.executorPollIntervalMs")
1202       .internal()
1203       .doc("The interval at which continuous execution readers will poll to check whether" +
1204         " the epoch has advanced on the driver.")
1205       .timeConf(TimeUnit.MILLISECONDS)
1206       .createWithDefault(100)
1207
1208   val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
1209     .internal()
1210     .doc("A comma-separated list of fully qualified data source register class names for which" +
1211       " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
1212     .stringConf
1213     .createWithDefault("")
1214
1215   val DISABLED_V2_STREAMING_MICROBATCH_READERS =
1216     buildConf("spark.sql.streaming.disabledV2MicroBatchReaders")
1217       .internal()
1218       .doc(
1219         "A comma-separated list of fully qualified data source register class names for which " +
1220           "MicroBatchReadSupport is disabled. Reads from these sources will fall back to the " +
1221           "V1 Sources.")
1222       .stringConf
1223       .createWithDefault("")
1224
1225   val REJECT_TIMEZONE_IN_STRING = buildConf("spark.sql.function.rejectTimezoneInString")
1226     .internal()
1227     .doc("If true, `to_utc_timestamp` and `from_utc_timestamp` return null if the input string " +
1228       "contains a timezone part, e.g. `2000-10-10 00:00:00+00:00`.")
1229     .booleanConf
1230     .createWithDefault(true)
1231
1232   object PartitionOverwriteMode extends Enumeration {
1233     val STATIC, DYNAMIC = Value
1234   }
1235
1236   val PARTITION_OVERWRITE_MODE =
1237     buildConf("spark.sql.sources.partitionOverwriteMode")
1238       .doc("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: " +
1239         "static and dynamic. In static mode, Spark deletes all the partitions that match the " +
1240         "partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before " +
1241         "overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
1242         "those partitions that have data written into it at runtime. By default we use static " +
1243         "mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
1244         "affect Hive serde tables, as they are always overwritten with dynamic mode.")
1245       .stringConf
1246       .transform(_.toUpperCase(Locale.ROOT))
1247       .checkValues(PartitionOverwriteMode.values.map(_.toString))
1248       .createWithDefault(PartitionOverwriteMode.STATIC.toString)
1249
1250   val SORT_BEFORE_REPARTITION =
1251     buildConf("spark.sql.execution.sortBeforeRepartition")
1252       .internal()
1253       .doc("When perform a repartition following a shuffle, the output row ordering would be " +
1254         "nondeterministic. If some downstream stages fail and some tasks of the repartition " +
1255         "stage retry, these tasks may generate different data, and that can lead to correctness " +
1256         "issues. Turn on this config to insert a local sort before actually doing repartition " +
1257         "to generate consistent repartition results. The performance of repartition() may go " +
1258         "down since we insert extra local sort before it.")
1259       .booleanConf
1260       .createWithDefault(true)
1261
1262   object Deprecated {
1263     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
1264   }
1265
1266   object Replaced {
1267     val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
1268   }
1269 }
1270
1271 /**
1272  * A class that enables the setting and getting of mutable config parameters/hints.
1273  *
1274  * In the presence of a SQLContext, these can be set and queried by passing SET commands
1275  * into Spark SQL's query functions (i.e. sql()). Otherwise, users of this class can
1276  * modify the hints by programmatically calling the setters and getters of this class.
1277  *
1278  * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
1279  */
1280 class SQLConf extends Serializable with Logging {
1281   import SQLConf._
1282
1283   /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
1284   @transient protected[spark] val settings = java.util.Collections.synchronizedMap(
1285     new java.util.HashMap[String, String]())
1286
1287   @transient private val reader = new ConfigReader(settings)
1288
1289   /** ************************ Spark SQL Params/Hints ******************* */
1290
1291   def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
1292
1293   def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
1294
1295   def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
1296
1297   def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
1298
1299   def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
1300
1301   def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
1302
1303   def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
1304
1305   def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
1306
1307   def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
1308
1309   def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
1310
1311   def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)
1312
1313   def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
1314
1315   def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY)
1316
1317   def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
1318
1319   def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
1320
1321   def streamingNoDataProgressEventInterval: Long =
1322     getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
1323
1324   def streamingNoDataMicroBatchesEnabled: Boolean =
1325     getConf(STREAMING_NO_DATA_MICRO_BATCHES_ENABLED)
1326
1327   def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
1328
1329   def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
1330
1331   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
1332
1333   def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
1334
1335   def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
1336
1337   def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
1338
1339   def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
1340
1341   def useCompression: Boolean = getConf(COMPRESS_CACHED)
1342
1343   def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
1344
1345   def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)
1346
1347   def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE)
1348
1349   def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
1350
1351   def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
1352
1353   def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)
1354
1355   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
1356
1357   def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
1358
1359   def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
1360
1361   def targetPostShuffleInputSize: Long =
1362     getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
1363
1364   def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
1365
1366   def minNumPostShufflePartitions: Int =
1367     getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
1368
1369   def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
1370
1371   def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
1372
1373   def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
1374
1375   def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
1376
1377   def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
1378
1379   def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
1380
1381   def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
1382
1383   def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
1384
1385   def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
1386     HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))
1387
1388   def compareDateTimestampInTimestamp : Boolean =
1389     getConf(TYPECOERCION_COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP)
1390
1391   def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
1392
1393   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
1394
1395   def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
1396
1397   def wholeStageUseIdInClassName: Boolean = getConf(WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME)
1398
1399   def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
1400
1401   def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
1402
1403   def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)
1404
1405   def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)
1406
1407   def wholeStageSplitConsumeFuncByOperator: Boolean =
1408     getConf(WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR)
1409
1410   def tableRelationCacheSize: Int =
1411     getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
1412
1413   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
1414
1415   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
1416
1417   def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
1418
1419   def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)
1420
1421   def fileCompressionFactor: Double = getConf(FILE_COMRESSION_FACTOR)
1422
1423   def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader)
1424
1425   def sortBeforeRepartition: Boolean = getConf(SORT_BEFORE_REPARTITION)
1426
1427   /**
1428    * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
1429    * identifiers are equal.
1430    */
1431   def resolver: Resolver = {
1432     if (caseSensitiveAnalysis) {
1433       org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
1434     } else {
1435       org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
1436     }
1437   }
1438
1439   def subexpressionEliminationEnabled: Boolean =
1440     getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
1441
1442   def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
1443
1444   def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)
1445
1446   def advancedPartitionPredicatePushdownEnabled: Boolean =
1447     getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)
1448
1449   def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
1450
1451   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
1452
1453   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
1454
1455   def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES)
1456
1457   def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
1458
1459   def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)
1460
1461   def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)
1462
1463   def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
1464
1465   def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
1466
1467   def isParquetINT96TimestampConversion: Boolean = getConf(PARQUET_INT96_TIMESTAMP_CONVERSION)
1468
1469   def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS)
1470
1471   def parquetOutputTimestampType: ParquetOutputTimestampType.Value = {
1472     val isOutputTimestampTypeSet = settings.containsKey(PARQUET_OUTPUT_TIMESTAMP_TYPE.key)
1473     if (!isOutputTimestampTypeSet && isParquetINT64AsTimestampMillis) {
1474       // If PARQUET_OUTPUT_TIMESTAMP_TYPE is not set and PARQUET_INT64_AS_TIMESTAMP_MILLIS is set,
1475       // respect PARQUET_INT64_AS_TIMESTAMP_MILLIS and use TIMESTAMP_MILLIS. Otherwise,
1476       // PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority.
1477       ParquetOutputTimestampType.TIMESTAMP_MILLIS
1478     } else {
1479       ParquetOutputTimestampType.withName(getConf(PARQUET_OUTPUT_TIMESTAMP_TYPE))
1480     }
1481   }
1482
1483   def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
1484
1485   def parquetRecordFilterEnabled: Boolean = getConf(PARQUET_RECORD_FILTER_ENABLED)
1486
1487   def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
1488
1489   def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)
1490
1491   def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
1492
1493   def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)
1494
1495   def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
1496
1497   def convertCTAS: Boolean = getConf(CONVERT_CTAS)
1498
1499   def partitionColumnTypeInferenceEnabled: Boolean =
1500     getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
1501
1502   def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS)
1503
1504   def parallelPartitionDiscoveryThreshold: Int =
1505     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
1506
1507   def parallelPartitionDiscoveryParallelism: Int =
1508     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
1509
1510   def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
1511
1512   def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
1513     getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
1514
1515   def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
1516
1517   def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
1518
1519   def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
1520
1521   def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
1522
1523   def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)
1524
1525   def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)
1526
1527   def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
1528
1529   def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)
1530
1531   def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString
1532
1533   def hiveThriftServerSingleSession: Boolean =
1534     getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION)
1535
1536   def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
1537
1538   def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
1539
1540   def groupByAliases: Boolean = getConf(GROUP_BY_ALIASES)
1541
1542   def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
1543
1544   def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
1545
1546   def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
1547
1548   def histogramEnabled: Boolean = getConf(HISTOGRAM_ENABLED)
1549
1550   def histogramNumBins: Int = getConf(HISTOGRAM_NUM_BINS)
1551
1552   def percentileAccuracy: Int = getConf(PERCENTILE_ACCURACY)
1553
1554   def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
1555
1556   def autoSizeUpdateEnabled: Boolean = getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED)
1557
1558   def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
1559
1560   def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
1561
1562   def joinReorderCardWeight: Double = getConf(SQLConf.JOIN_REORDER_CARD_WEIGHT)
1563
1564   def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
1565
1566   def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
1567
1568   def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
1569
1570   def sortMergeJoinExecBufferInMemoryThreshold: Int =
1571     getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
1572
1573   def sortMergeJoinExecBufferSpillThreshold: Int =
1574     getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)
1575
1576   def cartesianProductExecBufferInMemoryThreshold: Int =
1577     getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
1578
1579   def cartesianProductExecBufferSpillThreshold: Int =
1580     getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)
1581
1582   def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)
1583
1584   def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)
1585
1586   def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO)
1587
1588   def supportQuotedRegexColumnName: Boolean = getConf(SUPPORT_QUOTED_REGEX_COLUMN_NAME)
1589
1590   def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)
1591
1592   def arrowEnabled: Boolean = getConf(ARROW_EXECUTION_ENABLED)
1593
1594   def arrowFallbackEnabled: Boolean = getConf(ARROW_FALLBACK_ENABLED)
1595
1596   def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)
1597
1598   def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE)
1599
1600   def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER)
1601
1602   def decimalOperationsAllowPrecisionLoss: Boolean = getConf(DECIMAL_OPERATIONS_ALLOW_PREC_LOSS)
1603
1604   def continuousStreamingExecutorQueueSize: Int = getConf(CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE)
1605
1606   def continuousStreamingExecutorPollIntervalMs: Long =
1607     getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)
1608
1609   def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
1610
1611   def disabledV2StreamingMicroBatchReaders: String =
1612     getConf(DISABLED_V2_STREAMING_MICROBATCH_READERS)
1613
1614   def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
1615
1616   def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)
1617
1618   def allowCreatingManagedTableUsingNonemptyLocation: Boolean =
1619     getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION)
1620
1621   def partitionOverwriteMode: PartitionOverwriteMode.Value =
1622     PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
1623
1624   /** ********************** SQLConf functionality methods ************ */
1625
1626   /** Set Spark SQL configuration properties. */
1627   def setConf(props: Properties): Unit = settings.synchronized {
1628     props.asScala.foreach { case (k, v) => setConfString(k, v) }
1629   }
1630
1631   /** Set the given Spark SQL configuration property using a `string` value. */
1632   def setConfString(key: String, value: String): Unit = {
1633     require(key != null, "key cannot be null")
1634     require(value != null, s"value cannot be null for key: $key")
1635     val entry = sqlConfEntries.get(key)
1636     if (entry != null) {
1637       // Only verify configs in the SQLConf object
1638       entry.valueConverter(value)
1639     }
1640     setConfWithCheck(key, value)
1641   }
1642
1643   /** Set the given Spark SQL configuration property. */
1644   def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
1645     require(entry != null, "entry cannot be null")
1646     require(value != null, s"value cannot be null for key: ${entry.key}")
1647     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
1648     setConfWithCheck(entry.key, entry.stringConverter(value))
1649   }
1650
1651   /** Return the value of Spark SQL configuration property for the given key. */
1652   @throws[NoSuchElementException]("if key is not set")
1653   def getConfString(key: String): String = {
1654     Option(settings.get(key)).
1655       orElse {
1656         // Try to use the default value
1657         Option(sqlConfEntries.get(key)).map { e => e.stringConverter(e.readFrom(reader)) }
1658       }.
1659       getOrElse(throw new NoSuchElementException(key))
1660   }
1661
1662   /**
1663    * Return the value of Spark SQL configuration property for the given key. If the key is not set
1664    * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
1665    * desired one.
1666    */
1667   def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
1668     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
1669     Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
1670   }
1671
1672   /**
1673    * Return the value of Spark SQL configuration property for the given key. If the key is not set
1674    * yet, return `defaultValue` in [[ConfigEntry]].
1675    */
1676   def getConf[T](entry: ConfigEntry[T]): T = {
1677     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
1678     entry.readFrom(reader)
1679   }
1680
1681   /**
1682    * Return the value of an optional Spark SQL configuration property for the given key. If the key
1683    * is not set yet, returns None.
1684    */
1685   def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
1686     require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
1687     entry.readFrom(reader)
1688   }
1689
1690   /**
1691    * Return the `string` value of Spark SQL configuration property for the given key. If the key is
1692    * not set yet, return `defaultValue`.
1693    */
1694   def getConfString(key: String, defaultValue: String): String = {
1695     if (defaultValue != null && defaultValue != ConfigEntry.UNDEFINED) {
1696       val entry = sqlConfEntries.get(key)
1697       if (entry != null) {
1698         // Only verify configs in the SQLConf object
1699         entry.valueConverter(defaultValue)
1700       }
1701     }
1702     Option(settings.get(key)).getOrElse {
1703       // If the key is not set, need to check whether the config entry is registered and is
1704       // a fallback conf, so that we can check its parent.
1705       sqlConfEntries.get(key) match {
1706         case e: FallbackConfigEntry[_] => getConfString(e.fallback.key, defaultValue)
1707         case _ => defaultValue
1708       }
1709     }
1710   }
1711
1712   /**
1713    * Return all the configuration properties that have been set (i.e. not the default).
1714    * This creates a new copy of the config properties in the form of a Map.
1715    */
1716   def getAllConfs: immutable.Map[String, String] =
1717     settings.synchronized { settings.asScala.toMap }
1718
1719   /**
1720    * Return all the configuration definitions that have been defined in [[SQLConf]]. Each
1721    * definition contains key, defaultValue and doc.
1722    */
1723   def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
1724     sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
1725       val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString)
1726       (entry.key, displayValue, entry.doc)
1727     }.toSeq
1728   }
1729
1730   /**
1731    * Return whether a given key is set in this [[SQLConf]].
1732    */
1733   def contains(key: String): Boolean = {
1734     settings.containsKey(key)
1735   }
1736
1737   private def setConfWithCheck(key: String, value: String): Unit = {
1738     settings.put(key, value)
1739   }
1740
1741   def unsetConf(key: String): Unit = {
1742     settings.remove(key)
1743   }
1744
1745   def unsetConf(entry: ConfigEntry[_]): Unit = {
1746     settings.remove(entry.key)
1747   }
1748
1749   def clear(): Unit = {
1750     settings.clear()
1751   }
1752
1753   override def clone(): SQLConf = {
1754     val result = new SQLConf
1755     getAllConfs.foreach {
1756       case(k, v) => if (v ne null) result.setConfString(k, v)
1757     }
1758     result
1759   }
1760
1761   // For test only
1762   def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
1763     val cloned = clone()
1764     entries.foreach {
1765       case (entry, value) => cloned.setConfString(entry.key, value.toString)
1766     }
1767     cloned
1768   }
1769 }