[SPARK-24691][SQL] Dispatch the type support check in FileFormat implementation
[spark.git] / sql / core / src / main / scala / org / apache / spark / sql / execution / datasources / DataSource.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.apache.spark.sql.execution.datasources
19
20 import java.util.{Locale, ServiceConfigurationError, ServiceLoader}
21
22 import scala.collection.JavaConverters._
23 import scala.language.{existentials, implicitConversions}
24 import scala.util.{Failure, Success, Try}
25
26 import org.apache.hadoop.fs.Path
27
28 import org.apache.spark.deploy.SparkHadoopUtil
29 import org.apache.spark.internal.Logging
30 import org.apache.spark.sql._
31 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
32 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
33 import org.apache.spark.sql.catalyst.expressions.Attribute
34 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
35 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
36 import org.apache.spark.sql.execution.SparkPlan
37 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
38 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
39 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
40 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
41 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
42 import org.apache.spark.sql.execution.streaming._
43 import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
44 import org.apache.spark.sql.internal.SQLConf
45 import org.apache.spark.sql.sources._
46 import org.apache.spark.sql.streaming.OutputMode
47 import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
48 import org.apache.spark.sql.util.SchemaUtils
49 import org.apache.spark.util.Utils
50
51 /**
52  * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
53  * acting as the canonical set of parameters that can describe a Data Source, this class is used to
54  * resolve a description to a concrete implementation that can be used in a query plan
55  * (either batch or streaming) or to write out data using an external library.
56  *
57  * From an end user's perspective a DataSource description can be created explicitly using
58  * [[org.apache.spark.sql.DataFrameReader]] or CREATE TABLE USING DDL.  Additionally, this class is
59  * used when resolving a description from a metastore to a concrete implementation.
60  *
61  * Many of the arguments to this class are optional, though depending on the specific API being used
62  * these optional arguments might be filled in during resolution using either inference or external
63  * metadata.  For example, when reading a partitioned table from a file system, partition columns
64  * will be inferred from the directory layout even if they are not specified.
65  *
66  * @param paths A list of file system paths that hold data.  These will be globbed before and
67  *              qualified. This option only works when reading from a [[FileFormat]].
68  * @param userSpecifiedSchema An optional specification of the schema of the data. When present
69  *                            we skip attempting to infer the schema.
70  * @param partitionColumns A list of column names that the relation is partitioned by. This list is
71  *                         generally empty during the read path, unless this DataSource is managed
72  *                         by Hive. In these cases, during `resolveRelation`, we will call
73  *                         `getOrInferFileFormatSchema` for file based DataSources to infer the
74  *                         partitioning. In other cases, if this list is empty, then this table
75  *                         is unpartitioned.
76  * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
77  * @param catalogTable Optional catalog table reference that can be used to push down operations
78  *                     over the datasource to the catalog service.
79  */
80 case class DataSource(
81     sparkSession: SparkSession,
82     className: String,
83     paths: Seq[String] = Nil,
84     userSpecifiedSchema: Option[StructType] = None,
85     partitionColumns: Seq[String] = Seq.empty,
86     bucketSpec: Option[BucketSpec] = None,
87     options: Map[String, String] = Map.empty,
88     catalogTable: Option[CatalogTable] = None) extends Logging {
89
90   case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
91
92   lazy val providingClass: Class[_] =
93     DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
94   lazy val sourceInfo: SourceInfo = sourceSchema()
95   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
96   private val equality = sparkSession.sessionState.conf.resolver
97
98   bucketSpec.map { bucket =>
99     SchemaUtils.checkColumnNameDuplication(
100       bucket.bucketColumnNames, "in the bucket definition", equality)
101     SchemaUtils.checkColumnNameDuplication(
102       bucket.sortColumnNames, "in the sort definition", equality)
103   }
104
105   /**
106    * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
107    * it. In the read path, only managed tables by Hive provide the partition columns properly when
108    * initializing this class. All other file based data sources will try to infer the partitioning,
109    * and then cast the inferred types to user specified dataTypes if the partition columns exist
110    * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510.
111    * This method will try to skip file scanning whether `userSpecifiedSchema` and
112    * `partitionColumns` are provided. Here are some code paths that use this method:
113    *   1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns
114    *   2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the
115    *     dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred
116    *     dataType if they don't.
117    *   3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to
118    *     provide the schema. Here, we also perform partition inference like 2, and try to use
119    *     dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use
120    *     this information, therefore calls to this method should be very cheap, i.e. there won't
121    *     be any further inference in any triggers.
122    *
123    * @param format the file format object for this DataSource
124    * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
125    * @return A pair of the data schema (excluding partition columns) and the schema of the partition
126    *         columns.
127    */
128   private def getOrInferFileFormatSchema(
129       format: FileFormat,
130       fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
131     // The operations below are expensive therefore try not to do them if we don't need to, e.g.,
132     // in streaming mode, we have already inferred and registered partition columns, we will
133     // never have to materialize the lazy val below
134     lazy val tempFileIndex = fileIndex.getOrElse {
135       val globbedPaths =
136         checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
137       createInMemoryFileIndex(globbedPaths)
138     }
139
140     val partitionSchema = if (partitionColumns.isEmpty) {
141       // Try to infer partitioning, because no DataSource in the read path provides the partitioning
142       // columns properly unless it is a Hive DataSource
143       tempFileIndex.partitionSchema
144     } else {
145       // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
146       // partitioning
147       if (userSpecifiedSchema.isEmpty) {
148         val inferredPartitions = tempFileIndex.partitionSchema
149         inferredPartitions
150       } else {
151         val partitionFields = partitionColumns.map { partitionColumn =>
152           userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
153             val inferredPartitions = tempFileIndex.partitionSchema
154             val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
155             if (inferredOpt.isDefined) {
156               logDebug(
157                 s"""Type of partition column: $partitionColumn not found in specified schema
158                    |for $format.
159                    |User Specified Schema
160                    |=====================
161                    |${userSpecifiedSchema.orNull}
162                    |
163                    |Falling back to inferred dataType if it exists.
164                  """.stripMargin)
165             }
166             inferredOpt
167           }.getOrElse {
168             throw new AnalysisException(s"Failed to resolve the schema for $format for " +
169               s"the partition column: $partitionColumn. It must be specified manually.")
170           }
171         }
172         StructType(partitionFields)
173       }
174     }
175
176     val dataSchema = userSpecifiedSchema.map { schema =>
177       StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
178     }.orElse {
179       format.inferSchema(
180         sparkSession,
181         caseInsensitiveOptions,
182         tempFileIndex.allFiles())
183     }.getOrElse {
184       throw new AnalysisException(
185         s"Unable to infer schema for $format. It must be specified manually.")
186     }
187
188     // We just print a waring message if the data schema and partition schema have the duplicate
189     // columns. This is because we allow users to do so in the previous Spark releases and
190     // we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).
191     // See SPARK-18108 and SPARK-21144 for related discussions.
192     try {
193       SchemaUtils.checkColumnNameDuplication(
194         (dataSchema ++ partitionSchema).map(_.name),
195         "in the data schema and the partition schema",
196         equality)
197     } catch {
198       case e: AnalysisException => logWarning(e.getMessage)
199     }
200
201     (dataSchema, partitionSchema)
202   }
203
204   /** Returns the name and schema of the source that can be used to continually read data. */
205   private def sourceSchema(): SourceInfo = {
206     providingClass.newInstance() match {
207       case s: StreamSourceProvider =>
208         val (name, schema) = s.sourceSchema(
209           sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions)
210         SourceInfo(name, schema, Nil)
211
212       case format: FileFormat =>
213         val path = caseInsensitiveOptions.getOrElse("path", {
214           throw new IllegalArgumentException("'path' is not specified")
215         })
216
217         // Check whether the path exists if it is not a glob pattern.
218         // For glob pattern, we do not check it because the glob pattern might only make sense
219         // once the streaming job starts and some upstream source starts dropping data.
220         val hdfsPath = new Path(path)
221         if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
222           val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
223           if (!fs.exists(hdfsPath)) {
224             throw new AnalysisException(s"Path does not exist: $path")
225           }
226         }
227
228         val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
229         val isTextSource = providingClass == classOf[text.TextFileFormat]
230         // If the schema inference is disabled, only text sources require schema to be specified
231         if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
232           throw new IllegalArgumentException(
233             "Schema must be specified when creating a streaming source DataFrame. " +
234               "If some files already exist in the directory, then depending on the file format " +
235               "you may be able to create a static DataFrame on that directory with " +
236               "'spark.read.load(directory)' and infer schema from it.")
237         }
238         val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
239         SourceInfo(
240           s"FileSource[$path]",
241           StructType(dataSchema ++ partitionSchema),
242           partitionSchema.fieldNames)
243
244       case _ =>
245         throw new UnsupportedOperationException(
246           s"Data source $className does not support streamed reading")
247     }
248   }
249
250   /** Returns a source that can be used to continually read data. */
251   def createSource(metadataPath: String): Source = {
252     providingClass.newInstance() match {
253       case s: StreamSourceProvider =>
254         s.createSource(
255           sparkSession.sqlContext,
256           metadataPath,
257           userSpecifiedSchema,
258           className,
259           caseInsensitiveOptions)
260
261       case format: FileFormat =>
262         val path = caseInsensitiveOptions.getOrElse("path", {
263           throw new IllegalArgumentException("'path' is not specified")
264         })
265         new FileStreamSource(
266           sparkSession = sparkSession,
267           path = path,
268           fileFormatClassName = className,
269           schema = sourceInfo.schema,
270           partitionColumns = sourceInfo.partitionColumns,
271           metadataPath = metadataPath,
272           options = caseInsensitiveOptions)
273       case _ =>
274         throw new UnsupportedOperationException(
275           s"Data source $className does not support streamed reading")
276     }
277   }
278
279   /** Returns a sink that can be used to continually write data. */
280   def createSink(outputMode: OutputMode): Sink = {
281     providingClass.newInstance() match {
282       case s: StreamSinkProvider =>
283         s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
284
285       case fileFormat: FileFormat =>
286         val path = caseInsensitiveOptions.getOrElse("path", {
287           throw new IllegalArgumentException("'path' is not specified")
288         })
289         if (outputMode != OutputMode.Append) {
290           throw new AnalysisException(
291             s"Data source $className does not support $outputMode output mode")
292         }
293         new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)
294
295       case _ =>
296         throw new UnsupportedOperationException(
297           s"Data source $className does not support streamed writing")
298     }
299   }
300
301   /**
302    * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
303    * [[DataSource]]
304    *
305    * @param checkFilesExist Whether to confirm that the files exist when generating the
306    *                        non-streaming file based datasource. StructuredStreaming jobs already
307    *                        list file existence, and when generating incremental jobs, the batch
308    *                        is considered as a non-streaming file based data source. Since we know
309    *                        that files already exist, we don't need to check them again.
310    */
311   def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
312     val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
313       // TODO: Throw when too much is given.
314       case (dataSource: SchemaRelationProvider, Some(schema)) =>
315         dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)
316       case (dataSource: RelationProvider, None) =>
317         dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
318       case (_: SchemaRelationProvider, None) =>
319         throw new AnalysisException(s"A schema needs to be specified when using $className.")
320       case (dataSource: RelationProvider, Some(schema)) =>
321         val baseRelation =
322           dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
323         if (baseRelation.schema != schema) {
324           throw new AnalysisException(s"$className does not allow user-specified schemas.")
325         }
326         baseRelation
327
328       // We are reading from the results of a streaming query. Load files from the metadata log
329       // instead of listing them using HDFS APIs.
330       case (format: FileFormat, _)
331           if FileStreamSink.hasMetadata(
332             caseInsensitiveOptions.get("path").toSeq ++ paths,
333             sparkSession.sessionState.newHadoopConf()) =>
334         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
335         val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
336         val dataSchema = userSpecifiedSchema.orElse {
337           format.inferSchema(
338             sparkSession,
339             caseInsensitiveOptions,
340             fileCatalog.allFiles())
341         }.getOrElse {
342           throw new AnalysisException(
343             s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
344                 "It must be specified manually")
345         }
346
347         HadoopFsRelation(
348           fileCatalog,
349           partitionSchema = fileCatalog.partitionSchema,
350           dataSchema = dataSchema,
351           bucketSpec = None,
352           format,
353           caseInsensitiveOptions)(sparkSession)
354
355       // This is a non-streaming file based datasource.
356       case (format: FileFormat, _) =>
357         val globbedPaths =
358           checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
359         val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
360           catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
361           catalogTable.get.partitionColumnNames.nonEmpty
362         val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
363           val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
364           val index = new CatalogFileIndex(
365             sparkSession,
366             catalogTable.get,
367             catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
368           (index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
369         } else {
370           val index = createInMemoryFileIndex(globbedPaths)
371           val (resultDataSchema, resultPartitionSchema) =
372             getOrInferFileFormatSchema(format, Some(index))
373           (index, resultDataSchema, resultPartitionSchema)
374         }
375
376         HadoopFsRelation(
377           fileCatalog,
378           partitionSchema = partitionSchema,
379           dataSchema = dataSchema.asNullable,
380           bucketSpec = bucketSpec,
381           format,
382           caseInsensitiveOptions)(sparkSession)
383
384       case _ =>
385         throw new AnalysisException(
386           s"$className is not a valid Spark SQL Data Source.")
387     }
388
389     relation match {
390       case hs: HadoopFsRelation =>
391         SchemaUtils.checkColumnNameDuplication(
392           hs.dataSchema.map(_.name),
393           "in the data schema",
394           equality)
395         SchemaUtils.checkColumnNameDuplication(
396           hs.partitionSchema.map(_.name),
397           "in the partition schema",
398           equality)
399         DataSourceUtils.verifyReadSchema(hs.fileFormat, hs.dataSchema)
400       case _ =>
401         SchemaUtils.checkColumnNameDuplication(
402           relation.schema.map(_.name),
403           "in the data schema",
404           equality)
405     }
406
407     relation
408   }
409
410   /**
411    * Creates a command node to write the given [[LogicalPlan]] out to the given [[FileFormat]].
412    * The returned command is unresolved and need to be analyzed.
413    */
414   private def planForWritingFileFormat(
415       format: FileFormat, mode: SaveMode, data: LogicalPlan): InsertIntoHadoopFsRelationCommand = {
416     // Don't glob path for the write path.  The contracts here are:
417     //  1. Only one output path can be specified on the write path;
418     //  2. Output path must be a legal HDFS style file system path;
419     //  3. It's OK that the output path doesn't exist yet;
420     val allPaths = paths ++ caseInsensitiveOptions.get("path")
421     val outputPath = if (allPaths.length == 1) {
422       val path = new Path(allPaths.head)
423       val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
424       path.makeQualified(fs.getUri, fs.getWorkingDirectory)
425     } else {
426       throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
427         s"got: ${allPaths.mkString(", ")}")
428     }
429
430     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
431     PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
432
433     val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
434       sparkSession.table(tableIdent).queryExecution.analyzed.collect {
435         case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
436       }.head
437     }
438     // For partitioned relation r, r.schema's column ordering can be different from the column
439     // ordering of data.logicalPlan (partition columns are all moved after data column).  This
440     // will be adjusted within InsertIntoHadoopFsRelation.
441     InsertIntoHadoopFsRelationCommand(
442       outputPath = outputPath,
443       staticPartitions = Map.empty,
444       ifPartitionNotExists = false,
445       partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted),
446       bucketSpec = bucketSpec,
447       fileFormat = format,
448       options = options,
449       query = data,
450       mode = mode,
451       catalogTable = catalogTable,
452       fileIndex = fileIndex,
453       outputColumns = data.output)
454   }
455
456   /**
457    * Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for
458    * the following reading.
459    *
460    * @param mode The save mode for this writing.
461    * @param data The input query plan that produces the data to be written. Note that this plan
462    *             is analyzed and optimized.
463    * @param outputColumns The original output columns of the input query plan. The optimizer may not
464    *                      preserve the output column's names' case, so we need this parameter
465    *                      instead of `data.output`.
466    * @param physicalPlan The physical plan of the input query plan. We should run the writing
467    *                     command with this physical plan instead of creating a new physical plan,
468    *                     so that the metrics can be correctly linked to the given physical plan and
469    *                     shown in the web UI.
470    */
471   def writeAndRead(
472       mode: SaveMode,
473       data: LogicalPlan,
474       outputColumns: Seq[Attribute],
475       physicalPlan: SparkPlan): BaseRelation = {
476     if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
477       throw new AnalysisException("Cannot save interval data type into external storage.")
478     }
479
480     providingClass.newInstance() match {
481       case dataSource: CreatableRelationProvider =>
482         dataSource.createRelation(
483           sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
484       case format: FileFormat =>
485         val cmd = planForWritingFileFormat(format, mode, data)
486         val resolvedPartCols = cmd.partitionColumns.map { col =>
487           // The partition columns created in `planForWritingFileFormat` should always be
488           // `UnresolvedAttribute` with a single name part.
489           assert(col.isInstanceOf[UnresolvedAttribute])
490           val unresolved = col.asInstanceOf[UnresolvedAttribute]
491           assert(unresolved.nameParts.length == 1)
492           val name = unresolved.nameParts.head
493           outputColumns.find(a => equality(a.name, name)).getOrElse {
494             throw new AnalysisException(
495               s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]")
496           }
497         }
498         val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns)
499         resolved.run(sparkSession, physicalPlan)
500         // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
501         copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation()
502       case _ =>
503         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
504     }
505   }
506
507   /**
508    * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
509    */
510   def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
511     if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
512       throw new AnalysisException("Cannot save interval data type into external storage.")
513     }
514
515     providingClass.newInstance() match {
516       case dataSource: CreatableRelationProvider =>
517         SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
518       case format: FileFormat =>
519         DataSource.validateSchema(data.schema)
520         planForWritingFileFormat(format, mode, data)
521       case _ =>
522         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
523     }
524   }
525
526   /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
527   private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
528     val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
529     new InMemoryFileIndex(
530       sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
531   }
532
533   /**
534    * Checks and returns files in all the paths.
535    */
536   private def checkAndGlobPathIfNecessary(
537       checkEmptyGlobPath: Boolean,
538       checkFilesExist: Boolean): Seq[Path] = {
539     val allPaths = caseInsensitiveOptions.get("path") ++ paths
540     val hadoopConf = sparkSession.sessionState.newHadoopConf()
541     allPaths.flatMap { path =>
542       val hdfsPath = new Path(path)
543       val fs = hdfsPath.getFileSystem(hadoopConf)
544       val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
545       val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
546
547       if (checkEmptyGlobPath && globPath.isEmpty) {
548         throw new AnalysisException(s"Path does not exist: $qualified")
549       }
550
551       // Sufficient to check head of the globPath seq for non-glob scenario
552       // Don't need to check once again if files exist in streaming mode
553       if (checkFilesExist && !fs.exists(globPath.head)) {
554         throw new AnalysisException(s"Path does not exist: ${globPath.head}")
555       }
556       globPath
557     }.toSeq
558   }
559 }
560
561 object DataSource extends Logging {
562
563   /** A map to maintain backward compatibility in case we move data sources around. */
564   private val backwardCompatibilityMap: Map[String, String] = {
565     val jdbc = classOf[JdbcRelationProvider].getCanonicalName
566     val json = classOf[JsonFileFormat].getCanonicalName
567     val parquet = classOf[ParquetFileFormat].getCanonicalName
568     val csv = classOf[CSVFileFormat].getCanonicalName
569     val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
570     val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
571     val nativeOrc = classOf[OrcFileFormat].getCanonicalName
572     val socket = classOf[TextSocketSourceProvider].getCanonicalName
573     val rate = classOf[RateStreamProvider].getCanonicalName
574
575     Map(
576       "org.apache.spark.sql.jdbc" -> jdbc,
577       "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
578       "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
579       "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
580       "org.apache.spark.sql.json" -> json,
581       "org.apache.spark.sql.json.DefaultSource" -> json,
582       "org.apache.spark.sql.execution.datasources.json" -> json,
583       "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
584       "org.apache.spark.sql.parquet" -> parquet,
585       "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
586       "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
587       "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
588       "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
589       "org.apache.spark.sql.hive.orc" -> orc,
590       "org.apache.spark.sql.execution.datasources.orc.DefaultSource" -> nativeOrc,
591       "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
592       "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
593       "org.apache.spark.ml.source.libsvm" -> libsvm,
594       "com.databricks.spark.csv" -> csv,
595       "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
596       "org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
597     )
598   }
599
600   /**
601    * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
602    */
603   private val spark2RemovedClasses = Set(
604     "org.apache.spark.sql.DataFrame",
605     "org.apache.spark.sql.sources.HadoopFsRelationProvider",
606     "org.apache.spark.Logging")
607
608   /** Given a provider name, look up the data source class definition. */
609   def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
610     val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
611       case name if name.equalsIgnoreCase("orc") &&
612           conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
613         classOf[OrcFileFormat].getCanonicalName
614       case name if name.equalsIgnoreCase("orc") &&
615           conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
616         "org.apache.spark.sql.hive.orc.OrcFileFormat"
617       case name => name
618     }
619     val provider2 = s"$provider1.DefaultSource"
620     val loader = Utils.getContextOrSparkClassLoader
621     val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
622
623     try {
624       serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
625         // the provider format did not match any given registered aliases
626         case Nil =>
627           try {
628             Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
629               case Success(dataSource) =>
630                 // Found the data source using fully qualified path
631                 dataSource
632               case Failure(error) =>
633                 if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
634                   throw new AnalysisException(
635                     "Hive built-in ORC data source must be used with Hive support enabled. " +
636                     "Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
637                     "'native'")
638                 } else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
639                   provider1 == "com.databricks.spark.avro") {
640                   throw new AnalysisException(
641                     s"Failed to find data source: ${provider1.toLowerCase(Locale.ROOT)}. " +
642                     "Please find an Avro package at " +
643                     "http://spark.apache.org/third-party-projects.html")
644                 } else {
645                   throw new ClassNotFoundException(
646                     s"Failed to find data source: $provider1. Please find packages at " +
647                       "http://spark.apache.org/third-party-projects.html",
648                     error)
649                 }
650             }
651           } catch {
652             case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
653               // NoClassDefFoundError's class name uses "/" rather than "." for packages
654               val className = e.getMessage.replaceAll("/", ".")
655               if (spark2RemovedClasses.contains(className)) {
656                 throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
657                   "Please check if your library is compatible with Spark 2.0", e)
658               } else {
659                 throw e
660               }
661           }
662         case head :: Nil =>
663           // there is exactly one registered alias
664           head.getClass
665         case sources =>
666           // There are multiple registered aliases for the input. If there is single datasource
667           // that has "org.apache.spark" package in the prefix, we use it considering it is an
668           // internal datasource within Spark.
669           val sourceNames = sources.map(_.getClass.getName)
670           val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
671           if (internalSources.size == 1) {
672             logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
673               s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
674             internalSources.head.getClass
675           } else {
676             throw new AnalysisException(s"Multiple sources found for $provider1 " +
677               s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
678           }
679       }
680     } catch {
681       case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
682         // NoClassDefFoundError's class name uses "/" rather than "." for packages
683         val className = e.getCause.getMessage.replaceAll("/", ".")
684         if (spark2RemovedClasses.contains(className)) {
685           throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
686             "Please remove the incompatible library from classpath or upgrade it. " +
687             s"Error: ${e.getMessage}", e)
688         } else {
689           throw e
690         }
691     }
692   }
693
694   /**
695    * When creating a data source table, the `path` option has a special meaning: the table location.
696    * This method extracts the `path` option and treat it as table location to build a
697    * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this.
698    */
699   def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
700     val path = CaseInsensitiveMap(options).get("path")
701     val optionsWithoutPath = options.filterKeys(_.toLowerCase(Locale.ROOT) != "path")
702     CatalogStorageFormat.empty.copy(
703       locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
704   }
705
706   /**
707    * Called before writing into a FileFormat based data source to make sure the
708    * supplied schema is not empty.
709    * @param schema
710    */
711   private def validateSchema(schema: StructType): Unit = {
712     def hasEmptySchema(schema: StructType): Boolean = {
713       schema.size == 0 || schema.find {
714         case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
715         case _ => false
716       }.isDefined
717     }
718
719
720     if (hasEmptySchema(schema)) {
721       throw new AnalysisException(
722         s"""
723            |Datasource does not support writing empty or nested empty schemas.
724            |Please make sure the data schema has at least one or more column(s).
725          """.stripMargin)
726     }
727   }
728 }